Line data Source code
1 : /*
2 : * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 : /**
16 : * @file
17 : * @brief Unidirectional shared-memory multi-ring message queue
18 : */
19 :
20 : #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 : #define SRC_SVM_MESSAGE_QUEUE_H_
22 :
23 : #include <vppinfra/clib.h>
24 : #include <vppinfra/error.h>
25 : #include <vppinfra/lock.h>
26 : #include <svm/queue.h>
27 :
28 : typedef struct svm_msg_q_shr_queue_
29 : {
30 : pthread_mutex_t mutex; /* 8 bytes */
31 : pthread_cond_t condvar; /* 8 bytes */
32 : u32 head;
33 : u32 tail;
34 : volatile u32 cursize;
35 : u32 maxsize;
36 : u32 elsize;
37 : u32 pad;
38 : u8 data[0];
39 : } svm_msg_q_shared_queue_t;
40 :
41 : typedef struct svm_msg_q_queue_
42 : {
43 : svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
44 : int evtfd; /**< producer/consumer eventfd */
45 : clib_spinlock_t lock; /**< private lock for multi-producer */
46 : } svm_msg_q_queue_t;
47 :
48 : typedef struct svm_msg_q_ring_shared_
49 : {
50 : volatile u32 cursize; /**< current size of the ring */
51 : u32 nitems; /**< max size of the ring */
52 : volatile u32 head; /**< current head (for dequeue) */
53 : volatile u32 tail; /**< current tail (for enqueue) */
54 : u32 elsize; /**< size of an element */
55 : u8 data[0]; /**< chunk of memory for msg data */
56 : } svm_msg_q_ring_shared_t;
57 :
58 : typedef struct svm_msg_q_ring_
59 : {
60 : u32 nitems; /**< max size of the ring */
61 : u32 elsize; /**< size of an element */
62 : svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
63 : } __clib_packed svm_msg_q_ring_t;
64 :
65 : typedef struct svm_msg_q_shared_
66 : {
67 : u32 n_rings; /**< number of rings after q */
68 : u32 pad; /**< 8 byte alignment for q */
69 : svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
70 : } __clib_packed svm_msg_q_shared_t;
71 :
72 : typedef struct svm_msg_q_
73 : {
74 : svm_msg_q_queue_t q; /**< queue for exchanging messages */
75 : svm_msg_q_ring_t *rings; /**< rings with message data*/
76 : } __clib_packed svm_msg_q_t;
77 :
78 : typedef struct svm_msg_q_ring_cfg_
79 : {
80 : u32 nitems;
81 : u32 elsize;
82 : void *data;
83 : } svm_msg_q_ring_cfg_t;
84 :
85 : typedef struct svm_msg_q_cfg_
86 : {
87 : int consumer_pid; /**< pid of msg consumer */
88 : u32 q_nitems; /**< msg queue size (not rings) */
89 : u32 n_rings; /**< number of msg rings */
90 : svm_msg_q_ring_cfg_t *ring_cfgs; /**< array of ring cfgs */
91 : } svm_msg_q_cfg_t;
92 :
93 : typedef union
94 : {
95 : struct
96 : {
97 : u32 ring_index; /**< ring index, could be u8 */
98 : u32 elt_index; /**< index in ring */
99 : };
100 : u64 as_u64;
101 : } svm_msg_q_msg_t;
102 :
103 : #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
104 :
105 : typedef enum svm_msg_q_wait_type_
106 : {
107 : SVM_MQ_WAIT_EMPTY,
108 : SVM_MQ_WAIT_FULL
109 : } svm_msg_q_wait_type_t;
110 :
111 : /**
112 : * Allocate message queue
113 : *
114 : * Allocates a message queue on the heap. Based on the configuration options,
115 : * apart from the message queue this also allocates (one or multiple)
116 : * shared-memory rings for the messages.
117 : *
118 : * @param cfg configuration options: queue len, consumer pid,
119 : * ring configs
120 : * @return message queue
121 : */
122 : svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
123 : svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
124 : uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
125 :
126 : void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
127 :
128 : /**
129 : * Cleanup mq's private data
130 : */
131 : void svm_msg_q_cleanup (svm_msg_q_t *mq);
132 :
133 : /**
134 : * Free message queue
135 : *
136 : * @param mq message queue to be freed
137 : */
138 : void svm_msg_q_free (svm_msg_q_t * mq);
139 :
140 : /**
141 : * Allocate message buffer
142 : *
143 : * Message is allocated on the first available ring capable of holding
144 : * the requested number of bytes.
145 : *
146 : * @param mq message queue
147 : * @param nbytes number of bytes needed for message
148 : * @return message structure pointing to the ring and position
149 : * allocated
150 : */
151 : svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
152 :
153 : /**
154 : * Allocate message buffer on ring
155 : *
156 : * Message is allocated, on requested ring. The caller MUST check that
157 : * the ring is not full.
158 : *
159 : * @param mq message queue
160 : * @param ring_index ring on which the allocation should occur
161 : * @return message structure pointing to the ring and position
162 : * allocated
163 : */
164 : svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
165 :
166 : /**
167 : * Lock message queue and allocate message buffer on ring
168 : *
169 : * This should be used when multiple writers/readers are expected to
170 : * compete for the rings/queue. Message should be enqueued by calling
171 : * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
172 : * the message in enqueued.
173 : *
174 : * @param mq message queue
175 : * @param ring_index ring on which the allocation should occur
176 : * @param noblock flag that indicates if request should block
177 : * @param msg pointer to message to be filled in
178 : * @return 0 on success, negative number otherwise
179 : */
180 : int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
181 : u8 noblock, svm_msg_q_msg_t * msg);
182 :
183 : /**
184 : * Free message buffer
185 : *
186 : * Marks message buffer on ring as free.
187 : *
188 : * @param mq message queue
189 : * @param msg message to be freed
190 : */
191 : void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
192 :
193 : /**
194 : * Producer enqueue one message to queue
195 : *
196 : * Prior to calling this, the producer should've obtained a message buffer
197 : * from one of the rings by calling @ref svm_msg_q_alloc_msg.
198 : *
199 : * @param mq message queue
200 : * @param msg message (pointer to ring position) to be enqueued
201 : * @param nowait flag to indicate if request is blocking or not
202 : * @return success status
203 : */
204 : int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
205 :
206 : /**
207 : * Producer enqueue one message to queue with mutex held
208 : *
209 : * Prior to calling this, the producer should've obtained a message buffer
210 : * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
211 : * the queue mutex is held.
212 : *
213 : * @param mq message queue
214 : * @param msg message (pointer to ring position) to be enqueued
215 : * @return success status
216 : */
217 : void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
218 :
219 : /**
220 : * Consumer dequeue one message from queue
221 : *
222 : * This returns the message pointing to the data in the message rings.
223 : * Should only be used in single consumer scenarios as no locks are grabbed.
224 : * The consumer is expected to call @ref svm_msg_q_free_msg once it
225 : * finishes processing/copies the message data.
226 : *
227 : * @param mq message queue
228 : * @param msg pointer to structure where message is to be received
229 : * @param cond flag that indicates if request should block or not
230 : * @param time time to wait if condition it SVM_Q_TIMEDWAIT
231 : * @return success status
232 : */
233 : int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
234 : svm_q_conditional_wait_t cond, u32 time);
235 :
236 : /**
237 : * Consumer dequeue one message from queue
238 : *
239 : * Returns the message pointing to the data in the message rings. Should only
240 : * be used in single consumer scenarios as no locks are grabbed. The consumer
241 : * is expected to call @ref svm_msg_q_free_msg once it finishes
242 : * processing/copies the message data.
243 : *
244 : * @param mq message queue
245 : * @param msg pointer to structure where message is to be received
246 : * @return success status
247 : */
248 : int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
249 :
250 : /**
251 : * Consumer dequeue multiple messages from queue
252 : *
253 : * Returns the message pointing to the data in the message rings. Should only
254 : * be used in single consumer scenarios as no locks are grabbed. The consumer
255 : * is expected to call @ref svm_msg_q_free_msg once it finishes
256 : * processing/copies the message data.
257 : *
258 : * @param mq message queue
259 : * @param msg_buf pointer to array of messages to received
260 : * @param n_msgs lengt of msg_buf array
261 : * @return number of messages dequeued
262 : */
263 : int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
264 : u32 n_msgs);
265 :
266 : /**
267 : * Get data for message in queue
268 : *
269 : * @param mq message queue
270 : * @param msg message for which the data is requested
271 : * @return pointer to data
272 : */
273 : void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
274 :
275 : /**
276 : * Get message queue ring
277 : *
278 : * @param mq message queue
279 : * @param ring_index index of ring
280 : * @return pointer to ring
281 : */
282 : svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
283 :
284 : /**
285 : * Set event fd for queue
286 : *
287 : * If set, queue will exclusively use eventfds for signaling. Moreover,
288 : * afterwards, the queue should only be used in non-blocking mode. Waiting
289 : * for events should be done externally using something like epoll.
290 : *
291 : * @param mq message queue
292 : * @param fd consumer eventfd
293 : */
294 : void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
295 :
296 : /**
297 : * Allocate event fd for queue
298 : */
299 : int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
300 :
301 : /**
302 : * Format message queue, shows msg count for each ring
303 : */
304 : u8 *format_svm_msg_q (u8 *s, va_list *args);
305 :
306 : /**
307 : * Check length of message queue
308 : */
309 : static inline u32
310 224722144 : svm_msg_q_size (svm_msg_q_t *mq)
311 : {
312 224722144 : return clib_atomic_load_relax_n (&mq->q.shr->cursize);
313 : }
314 :
315 : /**
316 : * Check if message queue is full
317 : */
318 : static inline u8
319 266753 : svm_msg_q_is_full (svm_msg_q_t * mq)
320 : {
321 266753 : return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
322 : }
323 :
324 : static inline u8
325 266746 : svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
326 : {
327 266746 : svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
328 266746 : return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
329 : }
330 :
331 : static inline u8
332 266740 : svm_msg_q_or_ring_is_full (svm_msg_q_t *mq, u32 ring_index)
333 : {
334 266740 : return (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index));
335 : }
336 :
337 : /**
338 : * Check if message queue is empty
339 : */
340 : static inline u8
341 149165924 : svm_msg_q_is_empty (svm_msg_q_t * mq)
342 : {
343 149165924 : return (svm_msg_q_size (mq) == 0);
344 : }
345 :
346 : /**
347 : * Check if message is invalid
348 : */
349 : static inline u8
350 : svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
351 : {
352 : return (msg->as_u64 == (u64) ~ 0);
353 : }
354 :
355 : /**
356 : * Try locking message queue
357 : */
358 : static inline int
359 116733 : svm_msg_q_try_lock (svm_msg_q_t * mq)
360 : {
361 116733 : if (mq->q.evtfd == -1)
362 : {
363 116733 : int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
364 116733 : if (PREDICT_FALSE (rv == EOWNERDEAD))
365 0 : rv = pthread_mutex_consistent (&mq->q.shr->mutex);
366 116733 : return rv;
367 : }
368 : else
369 : {
370 0 : return !clib_spinlock_trylock (&mq->q.lock);
371 : }
372 : }
373 :
374 : /**
375 : * Lock, or block trying, the message queue
376 : */
377 : static inline int
378 150079 : svm_msg_q_lock (svm_msg_q_t * mq)
379 : {
380 150079 : if (mq->q.evtfd == -1)
381 : {
382 150051 : int rv = pthread_mutex_lock (&mq->q.shr->mutex);
383 150051 : if (PREDICT_FALSE (rv == EOWNERDEAD))
384 0 : rv = pthread_mutex_consistent (&mq->q.shr->mutex);
385 150051 : return rv;
386 : }
387 : else
388 : {
389 28 : clib_spinlock_lock (&mq->q.lock);
390 28 : return 0;
391 : }
392 : }
393 :
394 : /**
395 : * Unlock message queue
396 : */
397 : static inline void
398 267783 : svm_msg_q_unlock (svm_msg_q_t * mq)
399 : {
400 267783 : if (mq->q.evtfd == -1)
401 : {
402 267755 : pthread_mutex_unlock (&mq->q.shr->mutex);
403 : }
404 : else
405 : {
406 28 : clib_spinlock_unlock (&mq->q.lock);
407 : }
408 267783 : }
409 :
410 : /**
411 : * Wait for message queue event
412 : *
413 : * When eventfds are not configured, the shared memory mutex is locked
414 : * before waiting on the condvar. Typically called by consumers.
415 : */
416 : int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
417 :
418 : /**
419 : * Wait for message queue event as producer
420 : *
421 : * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
422 : * be held. Should only be called by producers.
423 : */
424 : int svm_msg_q_wait_prod (svm_msg_q_t *mq);
425 :
426 : /**
427 : * Wait for message queue or ring event as producer
428 : *
429 : * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
430 : * be held. Should only be called by producers.
431 : */
432 : int svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index);
433 :
434 : /**
435 : * Timed wait for message queue event
436 : *
437 : * Must be called with mutex held.
438 : *
439 : * @param mq message queue
440 : * @param timeout time in seconds
441 : */
442 : int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
443 :
444 : static inline int
445 30 : svm_msg_q_get_eventfd (svm_msg_q_t *mq)
446 : {
447 30 : return mq->q.evtfd;
448 : }
449 :
450 : #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
451 :
452 : /*
453 : * fd.io coding-style-patch-verification: ON
454 : *
455 : * Local Variables:
456 : * eval: (c-set-style "gnu")
457 : * End:
458 : */
|