LCOV - code coverage report
Current view: top level - svm - queue.c (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 134 234 57.3 %
Date: 2023-10-26 01:39:38 Functions: 13 22 59.1 %

          Line data    Source code
       1             : /*
       2             :  *------------------------------------------------------------------
       3             :  * svm_queue.c - unidirectional shared-memory queues
       4             :  *
       5             :  * Copyright (c) 2009-2019 Cisco and/or its affiliates.
       6             :  * Licensed under the Apache License, Version 2.0 (the "License");
       7             :  * you may not use this file except in compliance with the License.
       8             :  * You may obtain a copy of the License at:
       9             :  *
      10             :  *     http://www.apache.org/licenses/LICENSE-2.0
      11             :  *
      12             :  * Unless required by applicable law or agreed to in writing, software
      13             :  * distributed under the License is distributed on an "AS IS" BASIS,
      14             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      15             :  * See the License for the specific language governing permissions and
      16             :  * limitations under the License.
      17             :  *------------------------------------------------------------------
      18             :  */
      19             : 
      20             : 
      21             : #include <stdio.h>
      22             : #include <stdlib.h>
      23             : #include <string.h>
      24             : #include <pthread.h>
      25             : #include <vppinfra/mem.h>
      26             : #include <vppinfra/format.h>
      27             : #include <vppinfra/cache.h>
      28             : #include <svm/queue.h>
      29             : #include <vppinfra/time.h>
      30             : #include <vppinfra/lock.h>
      31             : 
      32             : svm_queue_t *
      33        4373 : svm_queue_init (void *base, int nels, int elsize)
      34             : {
      35             :   svm_queue_t *q;
      36             :   pthread_mutexattr_t attr;
      37             :   pthread_condattr_t cattr;
      38             : 
      39        4373 :   q = (svm_queue_t *) base;
      40        4373 :   clib_memset (q, 0, sizeof (*q));
      41             : 
      42        4373 :   q->elsize = elsize;
      43        4373 :   q->maxsize = nels;
      44        4373 :   q->producer_evtfd = -1;
      45        4373 :   q->consumer_evtfd = -1;
      46             : 
      47        4373 :   clib_memset (&attr, 0, sizeof (attr));
      48        4373 :   clib_memset (&cattr, 0, sizeof (cattr));
      49             : 
      50        4373 :   if (pthread_mutexattr_init (&attr))
      51           0 :     clib_unix_warning ("mutexattr_init");
      52        4373 :   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
      53           0 :     clib_unix_warning ("pthread_mutexattr_setpshared");
      54        4373 :   if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
      55           0 :     clib_unix_warning ("setrobust");
      56        4373 :   if (pthread_mutex_init (&q->mutex, &attr))
      57           0 :     clib_unix_warning ("mutex_init");
      58        4373 :   if (pthread_mutexattr_destroy (&attr))
      59           0 :     clib_unix_warning ("mutexattr_destroy");
      60        4373 :   if (pthread_condattr_init (&cattr))
      61           0 :     clib_unix_warning ("condattr_init");
      62             :   /* prints funny-looking messages in the Linux target */
      63        4373 :   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
      64           0 :     clib_unix_warning ("condattr_setpshared");
      65        4373 :   if (pthread_cond_init (&q->condvar, &cattr))
      66           0 :     clib_unix_warning ("cond_init1");
      67        4373 :   if (pthread_condattr_destroy (&cattr))
      68           0 :     clib_unix_warning ("cond_init2");
      69             : 
      70        4373 :   return (q);
      71             : }
      72             : 
      73             : svm_queue_t *
      74        4373 : svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
      75             : {
      76             :   svm_queue_t *q;
      77             : 
      78        4373 :   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
      79        4373 :                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
      80        4373 :   clib_memset (q, 0, sizeof (*q));
      81        4373 :   q = svm_queue_init (q, nels, elsize);
      82        4373 :   q->consumer_pid = consumer_pid;
      83             : 
      84        4373 :   return q;
      85             : }
      86             : 
      87             : /*
      88             :  * svm_queue_free
      89             :  */
      90             : void
      91          47 : svm_queue_free (svm_queue_t * q)
      92             : {
      93          47 :   (void) pthread_mutex_destroy (&q->mutex);
      94          47 :   (void) pthread_cond_destroy (&q->condvar);
      95          47 :   clib_mem_free (q);
      96          47 : }
      97             : 
      98             : void
      99      560385 : svm_queue_lock (svm_queue_t * q)
     100             : {
     101      560385 :   int rv = pthread_mutex_lock (&q->mutex);
     102      560385 :   if (PREDICT_FALSE (rv == EOWNERDEAD))
     103           0 :     pthread_mutex_consistent (&q->mutex);
     104      560385 : }
     105             : 
     106             : static int
     107     1125820 : svm_queue_trylock (svm_queue_t * q)
     108             : {
     109     1125820 :   int rv = pthread_mutex_trylock (&q->mutex);
     110     1125820 :   if (PREDICT_FALSE (rv == EOWNERDEAD))
     111           0 :     rv = pthread_mutex_consistent (&q->mutex);
     112     1125820 :   return rv;
     113             : }
     114             : 
     115             : void
     116     1685630 : svm_queue_unlock (svm_queue_t * q)
     117             : {
     118     1685630 :   pthread_mutex_unlock (&q->mutex);
     119     1685630 : }
     120             : 
     121             : int
     122           0 : svm_queue_is_full (svm_queue_t * q)
     123             : {
     124           0 :   return q->cursize == q->maxsize;
     125             : }
     126             : 
     127             : static inline void
     128        1843 : svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
     129             : {
     130        1843 :   if (q->producer_evtfd == -1)
     131             :     {
     132        1843 :       (void) pthread_cond_broadcast (&q->condvar);
     133             :     }
     134             :   else
     135             :     {
     136             :       int __clib_unused rv, fd;
     137           0 :       u64 data = 1;
     138           0 :       ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
     139           0 :       fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
     140           0 :       rv = write (fd, &data, sizeof (data));
     141           0 :       if (PREDICT_FALSE (rv < 0))
     142           0 :         clib_unix_warning ("signal write on %d returned %d", fd, rv);
     143             :     }
     144        1843 : }
     145             : 
     146             : void
     147           0 : svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
     148             : {
     149           0 :   svm_queue_send_signal_inline (q, is_prod);
     150           0 : }
     151             : 
     152             : static inline void
     153         375 : svm_queue_wait_inline (svm_queue_t * q)
     154             : {
     155         375 :   if (q->producer_evtfd == -1)
     156             :     {
     157         375 :       pthread_cond_wait (&q->condvar, &q->mutex);
     158             :     }
     159             :   else
     160             :     {
     161             :       /* Fake a wait for event. We could use epoll but that would mean
     162             :        * using yet another fd. Should do for now */
     163           0 :       u32 cursize = q->cursize;
     164           0 :       svm_queue_unlock (q);
     165           0 :       while (q->cursize == cursize)
     166           0 :         CLIB_PAUSE ();
     167           0 :       svm_queue_lock (q);
     168             :     }
     169         375 : }
     170             : 
     171             : void
     172           0 : svm_queue_wait (svm_queue_t * q)
     173             : {
     174           0 :   svm_queue_wait_inline (q);
     175           0 : }
     176             : 
     177             : static inline int
     178          37 : svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
     179             : {
     180             :   struct timespec ts;
     181          37 :   ts.tv_sec = unix_time_now () + (u32) timeout;
     182          37 :   ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
     183             : 
     184          37 :   if (q->producer_evtfd == -1)
     185             :     {
     186          37 :       return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
     187             :     }
     188             :   else
     189             :     {
     190           0 :       double max_time = unix_time_now () + timeout;
     191           0 :       u32 cursize = q->cursize;
     192             :       int rv;
     193             : 
     194           0 :       svm_queue_unlock (q);
     195           0 :       while (q->cursize == cursize && unix_time_now () < max_time)
     196           0 :         CLIB_PAUSE ();
     197           0 :       rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
     198           0 :       svm_queue_lock (q);
     199           0 :       return rv;
     200             :     }
     201             : }
     202             : 
     203             : int
     204           0 : svm_queue_timedwait (svm_queue_t * q, double timeout)
     205             : {
     206           0 :   return svm_queue_timedwait_inline (q, timeout);
     207             : }
     208             : 
     209             : /*
     210             :  * svm_queue_add_nolock
     211             :  */
     212             : int
     213           0 : svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
     214             : {
     215             :   i8 *tailp;
     216           0 :   int need_broadcast = 0;
     217             : 
     218           0 :   if (PREDICT_FALSE (q->cursize == q->maxsize))
     219             :     {
     220           0 :       while (q->cursize == q->maxsize)
     221           0 :         svm_queue_wait_inline (q);
     222             :     }
     223             : 
     224           0 :   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
     225           0 :   clib_memcpy_fast (tailp, elem, q->elsize);
     226             : 
     227           0 :   q->tail++;
     228           0 :   q->cursize++;
     229             : 
     230           0 :   need_broadcast = (q->cursize == 1);
     231             : 
     232           0 :   if (q->tail == q->maxsize)
     233           0 :     q->tail = 0;
     234             : 
     235           0 :   if (need_broadcast)
     236           0 :     svm_queue_send_signal_inline (q, 1);
     237           0 :   return 0;
     238             : }
     239             : 
     240             : void
     241           0 : svm_queue_add_raw (svm_queue_t * q, u8 * elem)
     242             : {
     243             :   i8 *tailp;
     244             : 
     245           0 :   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
     246           0 :   clib_memcpy_fast (tailp, elem, q->elsize);
     247             : 
     248           0 :   q->tail = (q->tail + 1) % q->maxsize;
     249           0 :   q->cursize++;
     250             : 
     251           0 :   if (q->cursize == 1)
     252           0 :     svm_queue_send_signal_inline (q, 1);
     253           0 : }
     254             : 
     255             : 
     256             : /*
     257             :  * svm_queue_add
     258             :  */
     259             : int
     260       63967 : svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
     261             : {
     262             :   i8 *tailp;
     263       63967 :   int need_broadcast = 0;
     264             : 
     265       63967 :   if (nowait)
     266             :     {
     267             :       /* zero on success */
     268          22 :       if (svm_queue_trylock (q))
     269             :         {
     270           0 :           return (-1);
     271             :         }
     272             :     }
     273             :   else
     274       63945 :     svm_queue_lock (q);
     275             : 
     276       63967 :   if (PREDICT_FALSE (q->cursize == q->maxsize))
     277             :     {
     278           0 :       if (nowait)
     279             :         {
     280           0 :           svm_queue_unlock (q);
     281           0 :           return (-2);
     282             :         }
     283           0 :       while (q->cursize == q->maxsize)
     284           0 :         svm_queue_wait_inline (q);
     285             :     }
     286             : 
     287       63967 :   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
     288       63967 :   clib_memcpy_fast (tailp, elem, q->elsize);
     289             : 
     290       63967 :   q->tail++;
     291       63967 :   q->cursize++;
     292             : 
     293       63967 :   need_broadcast = (q->cursize == 1);
     294             : 
     295       63967 :   if (q->tail == q->maxsize)
     296          33 :     q->tail = 0;
     297             : 
     298       63967 :   if (need_broadcast)
     299        1707 :     svm_queue_send_signal_inline (q, 1);
     300             : 
     301       63967 :   svm_queue_unlock (q);
     302             : 
     303       63967 :   return 0;
     304             : }
     305             : 
     306             : /*
     307             :  * svm_queue_add2
     308             :  */
     309             : int
     310         136 : svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
     311             : {
     312             :   i8 *tailp;
     313         136 :   int need_broadcast = 0;
     314             : 
     315         136 :   if (nowait)
     316             :     {
     317             :       /* zero on success */
     318           3 :       if (svm_queue_trylock (q))
     319             :         {
     320           0 :           return (-1);
     321             :         }
     322             :     }
     323             :   else
     324         133 :     svm_queue_lock (q);
     325             : 
     326         136 :   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
     327             :     {
     328           0 :       if (nowait)
     329             :         {
     330           0 :           svm_queue_unlock (q);
     331           0 :           return (-2);
     332             :         }
     333           0 :       while (q->cursize + 1 == q->maxsize)
     334           0 :         svm_queue_wait_inline (q);
     335             :     }
     336             : 
     337         136 :   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
     338         136 :   clib_memcpy_fast (tailp, elem, q->elsize);
     339             : 
     340         136 :   q->tail++;
     341         136 :   q->cursize++;
     342             : 
     343         136 :   if (q->tail == q->maxsize)
     344           0 :     q->tail = 0;
     345             : 
     346         136 :   need_broadcast = (q->cursize == 1);
     347             : 
     348         136 :   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
     349         136 :   clib_memcpy_fast (tailp, elem2, q->elsize);
     350             : 
     351         136 :   q->tail++;
     352         136 :   q->cursize++;
     353             : 
     354         136 :   if (q->tail == q->maxsize)
     355           0 :     q->tail = 0;
     356             : 
     357         136 :   if (need_broadcast)
     358         136 :     svm_queue_send_signal_inline (q, 1);
     359             : 
     360         136 :   svm_queue_unlock (q);
     361             : 
     362         136 :   return 0;
     363             : }
     364             : 
     365             : /*
     366             :  * svm_queue_sub
     367             :  */
     368             : int
     369     1127010 : svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
     370             :                u32 time)
     371             : {
     372             :   i8 *headp;
     373     1127010 :   int need_broadcast = 0;
     374     1127010 :   int rc = 0;
     375             : 
     376     1127010 :   if (cond == SVM_Q_NOWAIT)
     377             :     {
     378             :       /* zero on success */
     379     1125790 :       if (svm_queue_trylock (q))
     380             :         {
     381         577 :           return (-1);
     382             :         }
     383             :     }
     384             :   else
     385        1218 :     svm_queue_lock (q);
     386             : 
     387     1126440 :   if (PREDICT_FALSE (q->cursize == 0))
     388             :     {
     389     1125500 :       if (cond == SVM_Q_NOWAIT)
     390             :         {
     391     1125080 :           svm_queue_unlock (q);
     392     1125080 :           return (-2);
     393             :         }
     394         412 :       else if (cond == SVM_Q_TIMEDWAIT)
     395             :         {
     396          74 :           while (q->cursize == 0 && rc == 0)
     397          37 :             rc = svm_queue_timedwait_inline (q, time);
     398             : 
     399          37 :           if (rc == ETIMEDOUT)
     400             :             {
     401           0 :               svm_queue_unlock (q);
     402           0 :               return ETIMEDOUT;
     403             :             }
     404             :         }
     405             :       else
     406             :         {
     407         750 :           while (q->cursize == 0)
     408         375 :             svm_queue_wait_inline (q);
     409             :         }
     410             :     }
     411             : 
     412        1352 :   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
     413        1352 :   clib_memcpy_fast (elem, headp, q->elsize);
     414             : 
     415        1352 :   q->head++;
     416             :   /* $$$$ JFC shouldn't this be == 0? */
     417        1352 :   if (q->cursize == q->maxsize)
     418           0 :     need_broadcast = 1;
     419             : 
     420        1352 :   q->cursize--;
     421             : 
     422        1352 :   if (q->head == q->maxsize)
     423          33 :     q->head = 0;
     424             : 
     425        1352 :   if (need_broadcast)
     426           0 :     svm_queue_send_signal_inline (q, 0);
     427             : 
     428        1352 :   svm_queue_unlock (q);
     429             : 
     430        1352 :   return 0;
     431             : }
     432             : 
     433             : int
     434      495089 : svm_queue_sub2 (svm_queue_t * q, u8 * elem)
     435             : {
     436             :   int need_broadcast;
     437             :   i8 *headp;
     438             : 
     439      495089 :   svm_queue_lock (q);
     440      495089 :   if (q->cursize == 0)
     441             :     {
     442      432204 :       svm_queue_unlock (q);
     443      432204 :       return -1;
     444             :     }
     445             : 
     446       62885 :   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
     447       62885 :   clib_memcpy_fast (elem, headp, q->elsize);
     448             : 
     449       62885 :   q->head++;
     450       62885 :   need_broadcast = (q->cursize == q->maxsize / 2);
     451       62885 :   q->cursize--;
     452             : 
     453       62885 :   if (PREDICT_FALSE (q->head == q->maxsize))
     454           0 :     q->head = 0;
     455       62885 :   svm_queue_unlock (q);
     456             : 
     457       62885 :   if (need_broadcast)
     458           0 :     svm_queue_send_signal_inline (q, 0);
     459             : 
     460       62885 :   return 0;
     461             : }
     462             : 
     463             : int
     464           0 : svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
     465             : {
     466             :   int need_broadcast;
     467             :   i8 *headp;
     468             : 
     469           0 :   if (PREDICT_FALSE (q->cursize == 0))
     470             :     {
     471           0 :       while (q->cursize == 0)
     472             :         ;
     473             :     }
     474             : 
     475           0 :   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
     476           0 :   clib_memcpy_fast (elem, headp, q->elsize);
     477             : 
     478           0 :   need_broadcast = q->cursize == q->maxsize;
     479             : 
     480           0 :   q->head = (q->head + 1) % q->maxsize;
     481           0 :   q->cursize--;
     482             : 
     483           0 :   if (PREDICT_FALSE (need_broadcast))
     484           0 :     svm_queue_send_signal_inline (q, 0);
     485             : 
     486           0 :   return 0;
     487             : }
     488             : 
     489             : void
     490           0 : svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
     491             : {
     492           0 :   q->producer_evtfd = fd;
     493           0 : }
     494             : 
     495             : void
     496           0 : svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
     497             : {
     498           0 :   q->consumer_evtfd = fd;
     499           0 : }
     500             : 
     501             : /*
     502             :  * fd.io coding-style-patch-verification: ON
     503             :  *
     504             :  * Local Variables:
     505             :  * eval: (c-set-style "gnu")
     506             :  * End:
     507             :  */

Generated by: LCOV version 1.14