Line data Source code
1 : /*
2 : * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 : /**
16 : * @file
17 : * @brief Unidirectional shared-memory multi-ring message queue
18 : */
19 :
20 : #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 : #define SRC_SVM_MESSAGE_QUEUE_H_
22 :
23 : #include <vppinfra/clib.h>
24 : #include <vppinfra/error.h>
25 : #include <vppinfra/lock.h>
26 : #include <svm/queue.h>
27 :
28 : typedef struct svm_msg_q_shr_queue_
29 : {
30 : pthread_mutex_t mutex; /* 8 bytes */
31 : pthread_cond_t condvar; /* 8 bytes */
32 : u32 head;
33 : u32 tail;
34 : volatile u32 cursize;
35 : u32 maxsize;
36 : u32 elsize;
37 : u32 pad;
38 : u8 data[0];
39 : } svm_msg_q_shared_queue_t;
40 :
41 : typedef struct svm_msg_q_queue_
42 : {
43 : svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
44 : int evtfd; /**< producer/consumer eventfd */
45 : clib_spinlock_t lock; /**< private lock for multi-producer */
46 : } svm_msg_q_queue_t;
47 :
48 : typedef struct svm_msg_q_ring_shared_
49 : {
50 : volatile u32 cursize; /**< current size of the ring */
51 : u32 nitems; /**< max size of the ring */
52 : volatile u32 head; /**< current head (for dequeue) */
53 : volatile u32 tail; /**< current tail (for enqueue) */
54 : u32 elsize; /**< size of an element */
55 : u8 data[0]; /**< chunk of memory for msg data */
56 : } svm_msg_q_ring_shared_t;
57 :
58 : typedef struct svm_msg_q_ring_
59 : {
60 : u32 nitems; /**< max size of the ring */
61 : u32 elsize; /**< size of an element */
62 : svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
63 : } __clib_packed svm_msg_q_ring_t;
64 :
65 : typedef struct svm_msg_q_shared_
66 : {
67 : u32 n_rings; /**< number of rings after q */
68 : u32 pad; /**< 8 byte alignment for q */
69 : svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
70 : } __clib_packed svm_msg_q_shared_t;
71 :
72 : typedef struct svm_msg_q_
73 : {
74 : svm_msg_q_queue_t q; /**< queue for exchanging messages */
75 : svm_msg_q_ring_t *rings; /**< rings with message data*/
76 : } __clib_packed svm_msg_q_t;
77 :
78 : typedef struct svm_msg_q_ring_cfg_
79 : {
80 : u32 nitems;
81 : u32 elsize;
82 : void *data;
83 : } svm_msg_q_ring_cfg_t;
84 :
85 : typedef struct svm_msg_q_cfg_
86 : {
87 : int consumer_pid; /**< pid of msg consumer */
88 : u32 q_nitems; /**< msg queue size (not rings) */
89 : u32 n_rings; /**< number of msg rings */
90 : svm_msg_q_ring_cfg_t *ring_cfgs; /**< array of ring cfgs */
91 : } svm_msg_q_cfg_t;
92 :
93 : typedef union
94 : {
95 : struct
96 : {
97 : u32 ring_index; /**< ring index, could be u8 */
98 : u32 elt_index; /**< index in ring */
99 : };
100 : u64 as_u64;
101 : } svm_msg_q_msg_t;
102 :
103 : #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
104 :
105 : typedef enum svm_msg_q_wait_type_
106 : {
107 : SVM_MQ_WAIT_EMPTY,
108 : SVM_MQ_WAIT_FULL
109 : } svm_msg_q_wait_type_t;
110 :
111 : /**
112 : * Allocate message queue
113 : *
114 : * Allocates a message queue on the heap. Based on the configuration options,
115 : * apart from the message queue this also allocates (one or multiple)
116 : * shared-memory rings for the messages.
117 : *
118 : * @param cfg configuration options: queue len, consumer pid,
119 : * ring configs
120 : * @return message queue
121 : */
122 : svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
123 : svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
124 : uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
125 :
126 : void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
127 :
128 : /**
129 : * Cleanup mq's private data
130 : */
131 : void svm_msg_q_cleanup (svm_msg_q_t *mq);
132 :
133 : /**
134 : * Free message queue
135 : *
136 : * @param mq message queue to be freed
137 : */
138 : void svm_msg_q_free (svm_msg_q_t * mq);
139 :
140 : /**
141 : * Allocate message buffer
142 : *
143 : * Message is allocated on the first available ring capable of holding
144 : * the requested number of bytes.
145 : *
146 : * @param mq message queue
147 : * @param nbytes number of bytes needed for message
148 : * @return message structure pointing to the ring and position
149 : * allocated
150 : */
151 : svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
152 :
153 : /**
154 : * Allocate message buffer on ring
155 : *
156 : * Message is allocated, on requested ring. The caller MUST check that
157 : * the ring is not full.
158 : *
159 : * @param mq message queue
160 : * @param ring_index ring on which the allocation should occur
161 : * @return message structure pointing to the ring and position
162 : * allocated
163 : */
164 : svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
165 :
166 : /**
167 : * Lock message queue and allocate message buffer on ring
168 : *
169 : * This should be used when multiple writers/readers are expected to
170 : * compete for the rings/queue. Message should be enqueued by calling
171 : * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
172 : * the message in enqueued.
173 : *
174 : * @param mq message queue
175 : * @param ring_index ring on which the allocation should occur
176 : * @param noblock flag that indicates if request should block
177 : * @param msg pointer to message to be filled in
178 : * @return 0 on success, negative number otherwise
179 : */
180 : int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
181 : u8 noblock, svm_msg_q_msg_t * msg);
182 :
183 : /**
184 : * Free message buffer
185 : *
186 : * Marks message buffer on ring as free.
187 : *
188 : * @param mq message queue
189 : * @param msg message to be freed
190 : */
191 : void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
192 :
193 : /**
194 : * Producer enqueue one message to queue
195 : *
196 : * Must be called with mq locked. Prior to calling this, the producer should've
197 : * obtained a message buffer from one of the rings.
198 : *
199 : * @param mq message queue
200 : * @param msg message to be enqueued
201 : */
202 : void svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg);
203 :
204 : /**
205 : * Producer enqueue one message to queue
206 : *
207 : * Prior to calling this, the producer should've obtained a message buffer
208 : * from one of the rings by calling @ref svm_msg_q_alloc_msg.
209 : *
210 : * @param mq message queue
211 : * @param msg message (pointer to ring position) to be enqueued
212 : * @param nowait flag to indicate if request is blocking or not
213 : * @return success status
214 : */
215 : int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
216 :
217 : /**
218 : * Producer enqueue one message to queue with mutex held
219 : *
220 : * Prior to calling this, the producer should've obtained a message buffer
221 : * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
222 : * the queue mutex is held.
223 : *
224 : * @param mq message queue
225 : * @param msg message (pointer to ring position) to be enqueued
226 : * @return success status
227 : */
228 : void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
229 :
230 : /**
231 : * Consumer dequeue one message from queue
232 : *
233 : * This returns the message pointing to the data in the message rings.
234 : * Should only be used in single consumer scenarios as no locks are grabbed.
235 : * The consumer is expected to call @ref svm_msg_q_free_msg once it
236 : * finishes processing/copies the message data.
237 : *
238 : * @param mq message queue
239 : * @param msg pointer to structure where message is to be received
240 : * @param cond flag that indicates if request should block or not
241 : * @param time time to wait if condition it SVM_Q_TIMEDWAIT
242 : * @return success status
243 : */
244 : int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
245 : svm_q_conditional_wait_t cond, u32 time);
246 :
247 : /**
248 : * Consumer dequeue one message from queue
249 : *
250 : * Returns the message pointing to the data in the message rings. Should only
251 : * be used in single consumer scenarios as no locks are grabbed. The consumer
252 : * is expected to call @ref svm_msg_q_free_msg once it finishes
253 : * processing/copies the message data.
254 : *
255 : * @param mq message queue
256 : * @param msg pointer to structure where message is to be received
257 : * @return success status
258 : */
259 : int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
260 :
261 : /**
262 : * Consumer dequeue multiple messages from queue
263 : *
264 : * Returns the message pointing to the data in the message rings. Should only
265 : * be used in single consumer scenarios as no locks are grabbed. The consumer
266 : * is expected to call @ref svm_msg_q_free_msg once it finishes
267 : * processing/copies the message data.
268 : *
269 : * @param mq message queue
270 : * @param msg_buf pointer to array of messages to received
271 : * @param n_msgs lengt of msg_buf array
272 : * @return number of messages dequeued
273 : */
274 : int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
275 : u32 n_msgs);
276 :
277 : /**
278 : * Get data for message in queue
279 : *
280 : * @param mq message queue
281 : * @param msg message for which the data is requested
282 : * @return pointer to data
283 : */
284 : void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
285 :
286 : /**
287 : * Get message queue ring
288 : *
289 : * @param mq message queue
290 : * @param ring_index index of ring
291 : * @return pointer to ring
292 : */
293 : svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
294 :
295 : /**
296 : * Set event fd for queue
297 : *
298 : * If set, queue will exclusively use eventfds for signaling. Moreover,
299 : * afterwards, the queue should only be used in non-blocking mode. Waiting
300 : * for events should be done externally using something like epoll.
301 : *
302 : * @param mq message queue
303 : * @param fd consumer eventfd
304 : */
305 : void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
306 :
307 : /**
308 : * Allocate event fd for queue
309 : */
310 : int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
311 :
312 : /**
313 : * Format message queue, shows msg count for each ring
314 : */
315 : u8 *format_svm_msg_q (u8 *s, va_list *args);
316 :
317 : /**
318 : * Check length of message queue
319 : */
320 : static inline u32
321 227916967 : svm_msg_q_size (svm_msg_q_t *mq)
322 : {
323 227916967 : return clib_atomic_load_relax_n (&mq->q.shr->cursize);
324 : }
325 :
326 : /**
327 : * Check if message queue is full
328 : */
329 : static inline u8
330 369297 : svm_msg_q_is_full (svm_msg_q_t * mq)
331 : {
332 369297 : return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
333 : }
334 :
335 : static inline u8
336 369290 : svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
337 : {
338 369290 : svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
339 369290 : return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
340 : }
341 :
342 : static inline u8
343 369284 : svm_msg_q_or_ring_is_full (svm_msg_q_t *mq, u32 ring_index)
344 : {
345 369284 : return (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index));
346 : }
347 :
348 : /**
349 : * Check if message queue is empty
350 : */
351 : static inline u8
352 153617986 : svm_msg_q_is_empty (svm_msg_q_t * mq)
353 : {
354 153617986 : return (svm_msg_q_size (mq) == 0);
355 : }
356 :
357 : /**
358 : * Check if message is invalid
359 : */
360 : static inline u8
361 : svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
362 : {
363 : return (msg->as_u64 == (u64) ~ 0);
364 : }
365 :
366 : /**
367 : * Try locking message queue
368 : */
369 : static inline int
370 188334 : svm_msg_q_try_lock (svm_msg_q_t * mq)
371 : {
372 188334 : if (mq->q.evtfd == -1)
373 : {
374 188334 : int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
375 188334 : if (PREDICT_FALSE (rv == EOWNERDEAD))
376 0 : rv = pthread_mutex_consistent (&mq->q.shr->mutex);
377 188334 : return rv;
378 : }
379 : else
380 : {
381 0 : return !clib_spinlock_trylock (&mq->q.lock);
382 : }
383 : }
384 :
385 : /**
386 : * Lock, or block trying, the message queue
387 : */
388 : static inline int
389 167689 : svm_msg_q_lock (svm_msg_q_t * mq)
390 : {
391 167689 : if (mq->q.evtfd == -1)
392 : {
393 167658 : int rv = pthread_mutex_lock (&mq->q.shr->mutex);
394 167658 : if (PREDICT_FALSE (rv == EOWNERDEAD))
395 0 : rv = pthread_mutex_consistent (&mq->q.shr->mutex);
396 167658 : return rv;
397 : }
398 : else
399 : {
400 31 : clib_spinlock_lock (&mq->q.lock);
401 31 : return 0;
402 : }
403 : }
404 :
405 : /**
406 : * Unlock message queue
407 : */
408 : static inline void
409 357047 : svm_msg_q_unlock (svm_msg_q_t * mq)
410 : {
411 357047 : if (mq->q.evtfd == -1)
412 : {
413 357016 : pthread_mutex_unlock (&mq->q.shr->mutex);
414 : }
415 : else
416 : {
417 31 : clib_spinlock_unlock (&mq->q.lock);
418 : }
419 357047 : }
420 :
421 : /**
422 : * Wait for message queue event
423 : *
424 : * When eventfds are not configured, the shared memory mutex is locked
425 : * before waiting on the condvar. Typically called by consumers.
426 : */
427 : int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
428 :
429 : /**
430 : * Wait for message queue event as producer
431 : *
432 : * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
433 : * be held. Should only be called by producers.
434 : */
435 : int svm_msg_q_wait_prod (svm_msg_q_t *mq);
436 :
437 : /**
438 : * Wait for message queue or ring event as producer
439 : *
440 : * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
441 : * be held. Should only be called by producers.
442 : */
443 : int svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index);
444 :
445 : /**
446 : * Timed wait for message queue event
447 : *
448 : * Must be called with mutex held.
449 : *
450 : * @param mq message queue
451 : * @param timeout time in seconds
452 : */
453 : int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
454 :
455 : static inline int
456 32 : svm_msg_q_get_eventfd (svm_msg_q_t *mq)
457 : {
458 32 : return mq->q.evtfd;
459 : }
460 :
461 : #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
462 :
463 : /*
464 : * fd.io coding-style-patch-verification: ON
465 : *
466 : * Local Variables:
467 : * eval: (c-set-style "gnu")
468 : * End:
469 : */
|