Line data Source code
1 : /*
2 : *------------------------------------------------------------------
3 : * svm_queue.c - unidirectional shared-memory queues
4 : *
5 : * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6 : * Licensed under the Apache License, Version 2.0 (the "License");
7 : * you may not use this file except in compliance with the License.
8 : * You may obtain a copy of the License at:
9 : *
10 : * http://www.apache.org/licenses/LICENSE-2.0
11 : *
12 : * Unless required by applicable law or agreed to in writing, software
13 : * distributed under the License is distributed on an "AS IS" BASIS,
14 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 : * See the License for the specific language governing permissions and
16 : * limitations under the License.
17 : *------------------------------------------------------------------
18 : */
19 :
20 :
21 : #include <stdio.h>
22 : #include <stdlib.h>
23 : #include <string.h>
24 : #include <pthread.h>
25 : #include <vppinfra/mem.h>
26 : #include <vppinfra/format.h>
27 : #include <vppinfra/cache.h>
28 : #include <svm/queue.h>
29 : #include <vppinfra/time.h>
30 : #include <vppinfra/lock.h>
31 :
32 : svm_queue_t *
33 4254 : svm_queue_init (void *base, int nels, int elsize)
34 : {
35 : svm_queue_t *q;
36 : pthread_mutexattr_t attr;
37 : pthread_condattr_t cattr;
38 :
39 4254 : q = (svm_queue_t *) base;
40 4254 : clib_memset (q, 0, sizeof (*q));
41 :
42 4254 : q->elsize = elsize;
43 4254 : q->maxsize = nels;
44 4254 : q->producer_evtfd = -1;
45 4254 : q->consumer_evtfd = -1;
46 :
47 4254 : clib_memset (&attr, 0, sizeof (attr));
48 4254 : clib_memset (&cattr, 0, sizeof (cattr));
49 :
50 4254 : if (pthread_mutexattr_init (&attr))
51 0 : clib_unix_warning ("mutexattr_init");
52 4254 : if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53 0 : clib_unix_warning ("pthread_mutexattr_setpshared");
54 4254 : if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
55 0 : clib_unix_warning ("setrobust");
56 4254 : if (pthread_mutex_init (&q->mutex, &attr))
57 0 : clib_unix_warning ("mutex_init");
58 4254 : if (pthread_mutexattr_destroy (&attr))
59 0 : clib_unix_warning ("mutexattr_destroy");
60 4254 : if (pthread_condattr_init (&cattr))
61 0 : clib_unix_warning ("condattr_init");
62 : /* prints funny-looking messages in the Linux target */
63 4254 : if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
64 0 : clib_unix_warning ("condattr_setpshared");
65 4254 : if (pthread_cond_init (&q->condvar, &cattr))
66 0 : clib_unix_warning ("cond_init1");
67 4254 : if (pthread_condattr_destroy (&cattr))
68 0 : clib_unix_warning ("cond_init2");
69 :
70 4254 : return (q);
71 : }
72 :
73 : svm_queue_t *
74 4254 : svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
75 : {
76 : svm_queue_t *q;
77 :
78 4254 : q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
79 4254 : + nels * elsize, CLIB_CACHE_LINE_BYTES);
80 4254 : clib_memset (q, 0, sizeof (*q));
81 4254 : q = svm_queue_init (q, nels, elsize);
82 4254 : q->consumer_pid = consumer_pid;
83 :
84 4254 : return q;
85 : }
86 :
87 : /*
88 : * svm_queue_free
89 : */
90 : void
91 47 : svm_queue_free (svm_queue_t * q)
92 : {
93 47 : (void) pthread_mutex_destroy (&q->mutex);
94 47 : (void) pthread_cond_destroy (&q->condvar);
95 47 : clib_mem_free (q);
96 47 : }
97 :
98 : void
99 537442 : svm_queue_lock (svm_queue_t * q)
100 : {
101 537442 : int rv = pthread_mutex_lock (&q->mutex);
102 537442 : if (PREDICT_FALSE (rv == EOWNERDEAD))
103 0 : pthread_mutex_consistent (&q->mutex);
104 537442 : }
105 :
106 : static int
107 938054 : svm_queue_trylock (svm_queue_t * q)
108 : {
109 938054 : int rv = pthread_mutex_trylock (&q->mutex);
110 938054 : if (PREDICT_FALSE (rv == EOWNERDEAD))
111 0 : rv = pthread_mutex_consistent (&q->mutex);
112 938054 : return rv;
113 : }
114 :
115 : void
116 1475020 : svm_queue_unlock (svm_queue_t * q)
117 : {
118 1475020 : pthread_mutex_unlock (&q->mutex);
119 1475020 : }
120 :
121 : int
122 0 : svm_queue_is_full (svm_queue_t * q)
123 : {
124 0 : return q->cursize == q->maxsize;
125 : }
126 :
127 : static inline void
128 1770 : svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
129 : {
130 1770 : if (q->producer_evtfd == -1)
131 : {
132 1770 : (void) pthread_cond_broadcast (&q->condvar);
133 : }
134 : else
135 : {
136 : int __clib_unused rv, fd;
137 0 : u64 data = 1;
138 0 : ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
139 0 : fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
140 0 : rv = write (fd, &data, sizeof (data));
141 0 : if (PREDICT_FALSE (rv < 0))
142 0 : clib_unix_warning ("signal write on %d returned %d", fd, rv);
143 : }
144 1770 : }
145 :
146 : void
147 0 : svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
148 : {
149 0 : svm_queue_send_signal_inline (q, is_prod);
150 0 : }
151 :
152 : static inline void
153 368 : svm_queue_wait_inline (svm_queue_t * q)
154 : {
155 368 : if (q->producer_evtfd == -1)
156 : {
157 368 : pthread_cond_wait (&q->condvar, &q->mutex);
158 : }
159 : else
160 : {
161 : /* Fake a wait for event. We could use epoll but that would mean
162 : * using yet another fd. Should do for now */
163 0 : u32 cursize = q->cursize;
164 0 : svm_queue_unlock (q);
165 0 : while (q->cursize == cursize)
166 0 : CLIB_PAUSE ();
167 0 : svm_queue_lock (q);
168 : }
169 368 : }
170 :
171 : void
172 0 : svm_queue_wait (svm_queue_t * q)
173 : {
174 0 : svm_queue_wait_inline (q);
175 0 : }
176 :
177 : static inline int
178 28 : svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
179 : {
180 : struct timespec ts;
181 28 : ts.tv_sec = unix_time_now () + (u32) timeout;
182 28 : ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
183 :
184 28 : if (q->producer_evtfd == -1)
185 : {
186 28 : return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
187 : }
188 : else
189 : {
190 0 : double max_time = unix_time_now () + timeout;
191 0 : u32 cursize = q->cursize;
192 : int rv;
193 :
194 0 : svm_queue_unlock (q);
195 0 : while (q->cursize == cursize && unix_time_now () < max_time)
196 0 : CLIB_PAUSE ();
197 0 : rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
198 0 : svm_queue_lock (q);
199 0 : return rv;
200 : }
201 : }
202 :
203 : int
204 0 : svm_queue_timedwait (svm_queue_t * q, double timeout)
205 : {
206 0 : return svm_queue_timedwait_inline (q, timeout);
207 : }
208 :
209 : /*
210 : * svm_queue_add_nolock
211 : */
212 : int
213 0 : svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
214 : {
215 : i8 *tailp;
216 0 : int need_broadcast = 0;
217 :
218 0 : if (PREDICT_FALSE (q->cursize == q->maxsize))
219 : {
220 0 : while (q->cursize == q->maxsize)
221 0 : svm_queue_wait_inline (q);
222 : }
223 :
224 0 : tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
225 0 : clib_memcpy_fast (tailp, elem, q->elsize);
226 :
227 0 : q->tail++;
228 0 : q->cursize++;
229 :
230 0 : need_broadcast = (q->cursize == 1);
231 :
232 0 : if (q->tail == q->maxsize)
233 0 : q->tail = 0;
234 :
235 0 : if (need_broadcast)
236 0 : svm_queue_send_signal_inline (q, 1);
237 0 : return 0;
238 : }
239 :
240 : void
241 0 : svm_queue_add_raw (svm_queue_t * q, u8 * elem)
242 : {
243 : i8 *tailp;
244 :
245 0 : tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
246 0 : clib_memcpy_fast (tailp, elem, q->elsize);
247 :
248 0 : q->tail = (q->tail + 1) % q->maxsize;
249 0 : q->cursize++;
250 :
251 0 : if (q->cursize == 1)
252 0 : svm_queue_send_signal_inline (q, 1);
253 0 : }
254 :
255 :
256 : /*
257 : * svm_queue_add
258 : */
259 : int
260 60465 : svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
261 : {
262 : i8 *tailp;
263 60465 : int need_broadcast = 0;
264 :
265 60465 : if (nowait)
266 : {
267 : /* zero on success */
268 20 : if (svm_queue_trylock (q))
269 : {
270 0 : return (-1);
271 : }
272 : }
273 : else
274 60445 : svm_queue_lock (q);
275 :
276 60465 : if (PREDICT_FALSE (q->cursize == q->maxsize))
277 : {
278 0 : if (nowait)
279 : {
280 0 : svm_queue_unlock (q);
281 0 : return (-2);
282 : }
283 0 : while (q->cursize == q->maxsize)
284 0 : svm_queue_wait_inline (q);
285 : }
286 :
287 60465 : tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
288 60465 : clib_memcpy_fast (tailp, elem, q->elsize);
289 :
290 60465 : q->tail++;
291 60465 : q->cursize++;
292 :
293 60465 : need_broadcast = (q->cursize == 1);
294 :
295 60465 : if (q->tail == q->maxsize)
296 32 : q->tail = 0;
297 :
298 60465 : if (need_broadcast)
299 1634 : svm_queue_send_signal_inline (q, 1);
300 :
301 60465 : svm_queue_unlock (q);
302 :
303 60465 : return 0;
304 : }
305 :
306 : /*
307 : * svm_queue_add2
308 : */
309 : int
310 136 : svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
311 : {
312 : i8 *tailp;
313 136 : int need_broadcast = 0;
314 :
315 136 : if (nowait)
316 : {
317 : /* zero on success */
318 3 : if (svm_queue_trylock (q))
319 : {
320 0 : return (-1);
321 : }
322 : }
323 : else
324 133 : svm_queue_lock (q);
325 :
326 136 : if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
327 : {
328 0 : if (nowait)
329 : {
330 0 : svm_queue_unlock (q);
331 0 : return (-2);
332 : }
333 0 : while (q->cursize + 1 == q->maxsize)
334 0 : svm_queue_wait_inline (q);
335 : }
336 :
337 136 : tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
338 136 : clib_memcpy_fast (tailp, elem, q->elsize);
339 :
340 136 : q->tail++;
341 136 : q->cursize++;
342 :
343 136 : if (q->tail == q->maxsize)
344 0 : q->tail = 0;
345 :
346 136 : need_broadcast = (q->cursize == 1);
347 :
348 136 : tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
349 136 : clib_memcpy_fast (tailp, elem2, q->elsize);
350 :
351 136 : q->tail++;
352 136 : q->cursize++;
353 :
354 136 : if (q->tail == q->maxsize)
355 0 : q->tail = 0;
356 :
357 136 : if (need_broadcast)
358 136 : svm_queue_send_signal_inline (q, 1);
359 :
360 136 : svm_queue_unlock (q);
361 :
362 136 : return 0;
363 : }
364 :
365 : /*
366 : * svm_queue_sub
367 : */
368 : int
369 939227 : svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
370 : u32 time)
371 : {
372 : i8 *headp;
373 939227 : int need_broadcast = 0;
374 939227 : int rc = 0;
375 :
376 939227 : if (cond == SVM_Q_NOWAIT)
377 : {
378 : /* zero on success */
379 938031 : if (svm_queue_trylock (q))
380 : {
381 475 : return (-1);
382 : }
383 : }
384 : else
385 1196 : svm_queue_lock (q);
386 :
387 938752 : if (PREDICT_FALSE (q->cursize == 0))
388 : {
389 937818 : if (cond == SVM_Q_NOWAIT)
390 : {
391 937422 : svm_queue_unlock (q);
392 937422 : return (-2);
393 : }
394 396 : else if (cond == SVM_Q_TIMEDWAIT)
395 : {
396 56 : while (q->cursize == 0 && rc == 0)
397 28 : rc = svm_queue_timedwait_inline (q, time);
398 :
399 28 : if (rc == ETIMEDOUT)
400 : {
401 0 : svm_queue_unlock (q);
402 0 : return ETIMEDOUT;
403 : }
404 : }
405 : else
406 : {
407 736 : while (q->cursize == 0)
408 368 : svm_queue_wait_inline (q);
409 : }
410 : }
411 :
412 1330 : headp = (i8 *) (&q->data[0] + q->elsize * q->head);
413 1330 : clib_memcpy_fast (elem, headp, q->elsize);
414 :
415 1330 : q->head++;
416 : /* $$$$ JFC shouldn't this be == 0? */
417 1330 : if (q->cursize == q->maxsize)
418 0 : need_broadcast = 1;
419 :
420 1330 : q->cursize--;
421 :
422 1330 : if (q->head == q->maxsize)
423 32 : q->head = 0;
424 :
425 1330 : if (need_broadcast)
426 0 : svm_queue_send_signal_inline (q, 0);
427 :
428 1330 : svm_queue_unlock (q);
429 :
430 1330 : return 0;
431 : }
432 :
433 : int
434 475668 : svm_queue_sub2 (svm_queue_t * q, u8 * elem)
435 : {
436 : int need_broadcast;
437 : i8 *headp;
438 :
439 475668 : svm_queue_lock (q);
440 475668 : if (q->cursize == 0)
441 : {
442 416261 : svm_queue_unlock (q);
443 416261 : return -1;
444 : }
445 :
446 59407 : headp = (i8 *) (&q->data[0] + q->elsize * q->head);
447 59407 : clib_memcpy_fast (elem, headp, q->elsize);
448 :
449 59407 : q->head++;
450 59407 : need_broadcast = (q->cursize == q->maxsize / 2);
451 59407 : q->cursize--;
452 :
453 59407 : if (PREDICT_FALSE (q->head == q->maxsize))
454 0 : q->head = 0;
455 59407 : svm_queue_unlock (q);
456 :
457 59407 : if (need_broadcast)
458 0 : svm_queue_send_signal_inline (q, 0);
459 :
460 59407 : return 0;
461 : }
462 :
463 : int
464 0 : svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
465 : {
466 : int need_broadcast;
467 : i8 *headp;
468 :
469 0 : if (PREDICT_FALSE (q->cursize == 0))
470 : {
471 0 : while (q->cursize == 0)
472 : ;
473 : }
474 :
475 0 : headp = (i8 *) (&q->data[0] + q->elsize * q->head);
476 0 : clib_memcpy_fast (elem, headp, q->elsize);
477 :
478 0 : need_broadcast = q->cursize == q->maxsize;
479 :
480 0 : q->head = (q->head + 1) % q->maxsize;
481 0 : q->cursize--;
482 :
483 0 : if (PREDICT_FALSE (need_broadcast))
484 0 : svm_queue_send_signal_inline (q, 0);
485 :
486 0 : return 0;
487 : }
488 :
489 : void
490 0 : svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
491 : {
492 0 : q->producer_evtfd = fd;
493 0 : }
494 :
495 : void
496 0 : svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
497 : {
498 0 : q->consumer_evtfd = fd;
499 0 : }
500 :
501 : /*
502 : * fd.io coding-style-patch-verification: ON
503 : *
504 : * Local Variables:
505 : * eval: (c-set-style "gnu")
506 : * End:
507 : */
|