FancySafeBot 0.0.1
A safe robotics library
Loading...
Searching...
No Matches
fsb_queue.h
1#pragma once
2
3#include <cstdlib>
4#include <cstdint>
5#include <array>
6#include <pthread.h>
7
8#include "fsb_thread.h"
9#include "fsb_circular_buffer.h"
10
11namespace fsb
12{
13
22enum class QueueStatus
23{
24 SUCCESS,
25 UNINITIALIZED,
26 FULL,
28 TIMEOUT,
29 ERROR
30};
31
36template <typename QueueType, size_t QueueSize> class Queue
37{
38public:
39 Queue();
40 ~Queue();
41
42 // Delete copy constructor and copy assignment operator
43 Queue(const Queue&) = delete;
44 Queue& operator=(const Queue&) = delete;
45
46 // Delete move constructor and move assignment operator
47 Queue(Queue&&) = delete;
48 Queue& operator=(Queue&&) = delete;
49
56 QueueStatus Push(QueueType push_value);
57
64 QueueStatus ForcePush(QueueType push_value);
65
72 QueueStatus Pop(QueueType& popped_value);
73
81 QueueStatus PopAll(std::array<QueueType, QueueSize>& popped_values, size_t& num_popped);
82
91 QueueStatus PopWait(
92 std::array<QueueType, QueueSize>& popped_values, size_t& num_popped,
93 const struct timespec& timeout);
94
99 QueueStatus Reset();
100
101private:
102 bool m_initialized = false;
103 pthread_mutex_t m_mutex = {};
104 pthread_cond_t m_cond_var = {};
106};
107
108// ===============================
109// Queue Implementation
110// ===============================
111
112template <typename QueueType, size_t QueueSize> inline Queue<QueueType, QueueSize>::Queue()
113{
114 if (mutex_initialize(m_mutex) == LockStatus::SUCCESS)
115 {
116 if (condvar_initialize(m_cond_var) == LockStatus::SUCCESS)
117 {
118 m_initialized = true;
119 }
120 else
121 {
122 mutex_destroy(m_mutex);
123 }
124 }
125}
126
127template <typename QueueType, size_t QueueSize> Queue<QueueType, QueueSize>::~Queue()
128{
129 if (m_initialized)
130 {
131 mutex_destroy(m_mutex);
132 condvar_destroy(m_cond_var);
133 m_initialized = false;
134 }
135}
136
137template <typename QueueType, size_t QueueSize>
138inline QueueStatus Queue<QueueType, QueueSize>::Push(const QueueType push_value)
139{
140 if (!m_initialized)
141 {
142 return QueueStatus::UNINITIALIZED;
143 }
144 if (mutex_lock(m_mutex) != LockStatus::SUCCESS)
145 {
146 return QueueStatus::ERROR;
147 }
148 CircularBufferStatus buf_status = m_buffer.push(push_value);
149 mutex_unlock(m_mutex);
150 condvar_signal(m_cond_var);
151 if (buf_status == CircularBufferStatus::FULL)
152 {
153 return QueueStatus::FULL;
154 }
155 else if (buf_status != CircularBufferStatus::SUCCESS)
156 {
157 return QueueStatus::ERROR;
158 }
159 return QueueStatus::SUCCESS;
160}
161
162template <typename QueueType, size_t QueueSize>
163inline QueueStatus Queue<QueueType, QueueSize>::ForcePush(QueueType push_value)
164{
165 if (!m_initialized)
166 {
167 return QueueStatus::UNINITIALIZED;
168 }
169 if (mutex_lock(m_mutex) != LockStatus::SUCCESS)
170 {
171 return QueueStatus::ERROR;
172 }
173 CircularBufferStatus buf_status = m_buffer.force_push(push_value);
174 mutex_unlock(m_mutex);
175 condvar_signal(m_cond_var);
176 if (buf_status == CircularBufferStatus::OVERWRITE)
177 {
178 return QueueStatus::OVERWRITE;
179 }
180 else if (buf_status != CircularBufferStatus::SUCCESS)
181 {
182 return QueueStatus::ERROR;
183 }
184 return QueueStatus::SUCCESS;
185}
186
187template <typename QueueType, size_t QueueSize>
189 std::array<QueueType, QueueSize>& popped_values, size_t& num_popped)
190{
191 if (!m_initialized)
192 {
193 return QueueStatus::UNINITIALIZED;
194 }
195 if (mutex_lock(m_mutex) != LockStatus::SUCCESS)
196 {
197 return QueueStatus::ERROR;
198 }
199 CircularBufferStatus buf_status = m_buffer.pop_all(popped_values, num_popped);
200 mutex_unlock(m_mutex);
201 return (
202 buf_status == CircularBufferStatus::SUCCESS ? QueueStatus::SUCCESS : QueueStatus::ERROR);
203}
204
205template <typename QueueType, size_t QueueSize>
207 std::array<QueueType, QueueSize>& popped_values, size_t& num_popped,
208 const struct timespec& timeout)
209{
210 num_popped = 0;
211 if (!m_initialized)
212 {
213 return QueueStatus::UNINITIALIZED;
214 }
215 if (mutex_lock(m_mutex) != LockStatus::SUCCESS)
216 {
217 return QueueStatus::ERROR;
218 }
219 // pop all values
220 m_buffer.pop_all(popped_values, num_popped);
221 if (num_popped > 0)
222 {
223 // if we already have data, return it immediately
224 mutex_unlock(m_mutex);
225 return QueueStatus::SUCCESS;
226 }
227 // wait for new data
228 const LockStatus cv_status = condvar_wait_timeout(m_cond_var, m_mutex, timeout);
229 QueueStatus result = QueueStatus::SUCCESS;
230 if (cv_status != LockStatus::SUCCESS)
231 {
232 result = (cv_status == LockStatus::TIMEOUT ? QueueStatus::TIMEOUT : QueueStatus::ERROR);
233 }
234 else
235 {
236 // pop all values
237 m_buffer.pop_all(popped_values, num_popped);
238 }
239 mutex_unlock(m_mutex);
240 return result;
241}
242
243template <typename QueueType, size_t QueueSize>
244inline QueueStatus Queue<QueueType, QueueSize>::Pop(QueueType& popped_value)
245{
246 if (!m_initialized)
247 {
248 return QueueStatus::UNINITIALIZED;
249 }
250 if (mutex_lock(m_mutex) != LockStatus::SUCCESS)
251 {
252 return QueueStatus::ERROR;
253 }
254 CircularBufferStatus buf_status = m_buffer.pop(popped_value);
255 mutex_unlock(m_mutex);
256 return (
257 buf_status == CircularBufferStatus::SUCCESS ? QueueStatus::SUCCESS : QueueStatus::ERROR);
258}
259
260template <typename QueueType, size_t QueueSize>
262{
263 if (!m_initialized)
264 {
265 return QueueStatus::UNINITIALIZED;
266 }
267 if (mutex_lock(m_mutex) != LockStatus::SUCCESS)
268 {
269 return QueueStatus::ERROR;
270 }
271 m_buffer.Reset();
272 mutex_unlock(m_mutex);
273 return QueueStatus::SUCCESS;
274}
275
276// template<typename QueueType, size_t QueueSize>
277// inline QueueStatus Queue<QueueType, QueueSize>::Push(const QueueType push_value)
278// {
279// QueueStatus status = QueueStatus::SUCCESS;
280// {
281// std::lock_guard<std::mutex> lock(m_mutex);
282// CircularBufferStatus buf_status = m_buffer.Push(push_value);
283// if (buf_status == CircularBufferStatus::FULL) {
284// status = QueueStatus::FULL;
285// } else if (buf_status != CircularBufferStatus::SUCCESS) {
286// status = QueueStatus::ERROR;
287// }
288// }
289// m_cond_var.notify_all();
290// return status;
291// }
292
293// template<typename QueueType, size_t QueueSize>
294// inline QueueStatus Queue<QueueType, QueueSize>::ForcePush(QueueType push_value)
295// {
296// QueueStatus status = QueueStatus::SUCCESS;
297// {
298// std::lock_guard<std::mutex> lock(m_mutex);
299// CircularBufferStatus buf_status = m_buffer.ForcePush(push_value);
300// if (buf_status == CircularBufferStatus::OVERWRITE) {
301// status = QueueStatus::OVERWRITE;
302// } else if (buf_status != CircularBufferStatus::SUCCESS) {
303// status = QueueStatus::ERROR;
304// }
305// }
306// m_cond_var.notify_all();
307// return status;
308// }
309
310// template<typename QueueType, size_t QueueSize>
311// inline QueueStatus Queue<QueueType, QueueSize>::PopAll(std::array<QueueType, QueueSize>
312// &popped_values,
313// size_t& num_popped, const struct timespec& timeout)
314// {
315// std::unique_lock<std::mutex> lock(m_mutex);
316// std::cv_status status = m_cond_var.wait_for(lock, std::chrono::nanoseconds(timeout.tv_nsec) +
317// std::chrono::seconds(timeout.tv_sec), [this] { return !m_buffer.Empty(); }); if (status ==
318// std::cv_status::timeout)
319// {
320// return QueueStatus::TIMEOUT;
321// }
322// // pop all values
323// num_popped = 0;
324// while (!m_buffer.Empty())
325// {
326// QueueType value;
327// if (m_buffer.Pop(value) == QueueStatus::SUCCESS)
328// {
329// popped_values[num_popped++] = value;
330// }
331// }
332// return QueueStatus::SUCCESS;
333// }
334
335// template<typename QueueType, size_t QueueSize>
336// inline QueueStatus Queue<QueueType, QueueSize>::Pop(QueueType &popped_value)
337// {
338// std::lock_guard<std::mutex> lock(m_mutex);
339// CircularBufferStatus buf_status = m_buffer.Pop(popped_value);
340// return buf_status == CircularBufferStatus::SUCCESS ? QueueStatus::SUCCESS :
341// QueueStatus::ERROR;
342// }
343
344// template<typename QueueType, size_t QueueSize>
345// inline void Queue<QueueType, QueueSize>::Reset()
346// {
347// std::lock_guard<std::mutex> lock(m_mutex);
348// m_buffer.Reset();
349// }
350
351} // namespace fsb
Circular Buffer.
Definition fsb_circular_buffer.h:44
Queue.
Definition fsb_queue.h:37
CircularBufferStatus
Definition fsb_circular_buffer.h:19
@ OVERWRITE
Adding to buffer overwrote existing data.
@ FULL
Operation failed, buffer is full.
@ SUCCESS
Successful operation.
QueueStatus PopWait(std::array< QueueType, QueueSize > &popped_values, size_t &num_popped, const struct timespec &timeout)
Wait for a new value to be added to the buffer, then pop all values.
Definition fsb_queue.h:206
QueueStatus ForcePush(QueueType push_value)
Add value to buffer and overwrite oldest value if buffer is full.
Definition fsb_queue.h:163
QueueStatus PopAll(std::array< QueueType, QueueSize > &popped_values, size_t &num_popped)
Wait for a new value to be added to the buffer, then pop all values.
Definition fsb_queue.h:188
QueueStatus Reset()
Reset buffer to empty state.
Definition fsb_queue.h:261
QueueStatus Push(QueueType push_value)
Add value to buffer if there is space available.
Definition fsb_queue.h:138
QueueStatus Pop(QueueType &popped_value)
Get oldest value from buffer.
Definition fsb_queue.h:244