LCOV - code coverage report
Current view: top level - svm - message_queue.h (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 30 33 90.9 %
Date: 2023-10-26 01:39:38 Functions: 9 9 100.0 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
       3             :  * Licensed under the Apache License, Version 2.0 (the "License");
       4             :  * you may not use this file except in compliance with the License.
       5             :  * You may obtain a copy of the License at:
       6             :  *
       7             :  *     http://www.apache.org/licenses/LICENSE-2.0
       8             :  *
       9             :  * Unless required by applicable law or agreed to in writing, software
      10             :  * distributed under the License is distributed on an "AS IS" BASIS,
      11             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             :  * See the License for the specific language governing permissions and
      13             :  * limitations under the License.
      14             :  */
      15             : /**
      16             :  * @file
      17             :  * @brief Unidirectional shared-memory multi-ring message queue
      18             :  */
      19             : 
      20             : #ifndef SRC_SVM_MESSAGE_QUEUE_H_
      21             : #define SRC_SVM_MESSAGE_QUEUE_H_
      22             : 
      23             : #include <vppinfra/clib.h>
      24             : #include <vppinfra/error.h>
      25             : #include <vppinfra/lock.h>
      26             : #include <svm/queue.h>
      27             : 
      28             : typedef struct svm_msg_q_shr_queue_
      29             : {
      30             :   pthread_mutex_t mutex;  /* 8 bytes */
      31             :   pthread_cond_t condvar; /* 8 bytes */
      32             :   u32 head;
      33             :   u32 tail;
      34             :   volatile u32 cursize;
      35             :   u32 maxsize;
      36             :   u32 elsize;
      37             :   u32 pad;
      38             :   u8 data[0];
      39             : } svm_msg_q_shared_queue_t;
      40             : 
      41             : typedef struct svm_msg_q_queue_
      42             : {
      43             :   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
      44             :   int evtfd;                     /**< producer/consumer eventfd */
      45             :   clib_spinlock_t lock;          /**< private lock for multi-producer */
      46             : } svm_msg_q_queue_t;
      47             : 
      48             : typedef struct svm_msg_q_ring_shared_
      49             : {
      50             :   volatile u32 cursize;                 /**< current size of the ring */
      51             :   u32 nitems;                           /**< max size of the ring */
      52             :   volatile u32 head;                    /**< current head (for dequeue) */
      53             :   volatile u32 tail;                    /**< current tail (for enqueue) */
      54             :   u32 elsize;                           /**< size of an element */
      55             :   u8 data[0];                           /**< chunk of memory for msg data */
      56             : } svm_msg_q_ring_shared_t;
      57             : 
      58             : typedef struct svm_msg_q_ring_
      59             : {
      60             :   u32 nitems;                   /**< max size of the ring */
      61             :   u32 elsize;                   /**< size of an element */
      62             :   svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
      63             : } __clib_packed svm_msg_q_ring_t;
      64             : 
      65             : typedef struct svm_msg_q_shared_
      66             : {
      67             :   u32 n_rings;                   /**< number of rings after q */
      68             :   u32 pad;                       /**< 8 byte alignment for q */
      69             :   svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
      70             : } __clib_packed svm_msg_q_shared_t;
      71             : 
      72             : typedef struct svm_msg_q_
      73             : {
      74             :   svm_msg_q_queue_t q;                  /**< queue for exchanging messages */
      75             :   svm_msg_q_ring_t *rings;              /**< rings with message data*/
      76             : } __clib_packed svm_msg_q_t;
      77             : 
      78             : typedef struct svm_msg_q_ring_cfg_
      79             : {
      80             :   u32 nitems;
      81             :   u32 elsize;
      82             :   void *data;
      83             : } svm_msg_q_ring_cfg_t;
      84             : 
      85             : typedef struct svm_msg_q_cfg_
      86             : {
      87             :   int consumer_pid;                     /**< pid of msg consumer */
      88             :   u32 q_nitems;                         /**< msg queue size (not rings) */
      89             :   u32 n_rings;                          /**< number of msg rings */
      90             :   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
      91             : } svm_msg_q_cfg_t;
      92             : 
      93             : typedef union
      94             : {
      95             :   struct
      96             :   {
      97             :     u32 ring_index;                     /**< ring index, could be u8 */
      98             :     u32 elt_index;                      /**< index in ring */
      99             :   };
     100             :   u64 as_u64;
     101             : } svm_msg_q_msg_t;
     102             : 
     103             : #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
     104             : 
     105             : typedef enum svm_msg_q_wait_type_
     106             : {
     107             :   SVM_MQ_WAIT_EMPTY,
     108             :   SVM_MQ_WAIT_FULL
     109             : } svm_msg_q_wait_type_t;
     110             : 
     111             : /**
     112             :  * Allocate message queue
     113             :  *
     114             :  * Allocates a message queue on the heap. Based on the configuration options,
     115             :  * apart from the message queue this also allocates (one or multiple)
     116             :  * shared-memory rings for the messages.
     117             :  *
     118             :  * @param cfg           configuration options: queue len, consumer pid,
     119             :  *                      ring configs
     120             :  * @return              message queue
     121             :  */
     122             : svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
     123             : svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
     124             : uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
     125             : 
     126             : void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
     127             : 
     128             : /**
     129             :  * Cleanup mq's private data
     130             :  */
     131             : void svm_msg_q_cleanup (svm_msg_q_t *mq);
     132             : 
     133             : /**
     134             :  * Free message queue
     135             :  *
     136             :  * @param mq            message queue to be freed
     137             :  */
     138             : void svm_msg_q_free (svm_msg_q_t * mq);
     139             : 
     140             : /**
     141             :  * Allocate message buffer
     142             :  *
     143             :  * Message is allocated on the first available ring capable of holding
     144             :  * the requested number of bytes.
     145             :  *
     146             :  * @param mq            message queue
     147             :  * @param nbytes        number of bytes needed for message
     148             :  * @return              message structure pointing to the ring and position
     149             :  *                      allocated
     150             :  */
     151             : svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
     152             : 
     153             : /**
     154             :  * Allocate message buffer on ring
     155             :  *
     156             :  * Message is allocated, on requested ring. The caller MUST check that
     157             :  * the ring is not full.
     158             :  *
     159             :  * @param mq            message queue
     160             :  * @param ring_index    ring on which the allocation should occur
     161             :  * @return              message structure pointing to the ring and position
     162             :  *                      allocated
     163             :  */
     164             : svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
     165             : 
     166             : /**
     167             :  * Lock message queue and allocate message buffer on ring
     168             :  *
     169             :  * This should be used when multiple writers/readers are expected to
     170             :  * compete for the rings/queue. Message should be enqueued by calling
     171             :  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
     172             :  * the message in enqueued.
     173             :  *
     174             :  * @param mq            message queue
     175             :  * @param ring_index    ring on which the allocation should occur
     176             :  * @param noblock       flag that indicates if request should block
     177             :  * @param msg           pointer to message to be filled in
     178             :  * @return              0 on success, negative number otherwise
     179             :  */
     180             : int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
     181             :                                          u8 noblock, svm_msg_q_msg_t * msg);
     182             : 
     183             : /**
     184             :  * Free message buffer
     185             :  *
     186             :  * Marks message buffer on ring as free.
     187             :  *
     188             :  * @param mq            message queue
     189             :  * @param msg           message to be freed
     190             :  */
     191             : void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
     192             : 
     193             : /**
     194             :  * Producer enqueue one message to queue
     195             :  *
     196             :  * Must be called with mq locked. Prior to calling this, the producer should've
     197             :  * obtained a message buffer from one of the rings.
     198             :  *
     199             :  * @param mq            message queue
     200             :  * @param msg           message to be enqueued
     201             :  */
     202             : void svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg);
     203             : 
     204             : /**
     205             :  * Producer enqueue one message to queue
     206             :  *
     207             :  * Prior to calling this, the producer should've obtained a message buffer
     208             :  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
     209             :  *
     210             :  * @param mq            message queue
     211             :  * @param msg           message (pointer to ring position) to be enqueued
     212             :  * @param nowait        flag to indicate if request is blocking or not
     213             :  * @return              success status
     214             :  */
     215             : int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
     216             : 
     217             : /**
     218             :  * Producer enqueue one message to queue with mutex held
     219             :  *
     220             :  * Prior to calling this, the producer should've obtained a message buffer
     221             :  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
     222             :  * the queue mutex is held.
     223             :  *
     224             :  * @param mq            message queue
     225             :  * @param msg           message (pointer to ring position) to be enqueued
     226             :  * @return              success status
     227             :  */
     228             : void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
     229             : 
     230             : /**
     231             :  * Consumer dequeue one message from queue
     232             :  *
     233             :  * This returns the message pointing to the data in the message rings.
     234             :  * Should only be used in single consumer scenarios as no locks are grabbed.
     235             :  * The consumer is expected to call @ref svm_msg_q_free_msg once it
     236             :  * finishes processing/copies the message data.
     237             :  *
     238             :  * @param mq            message queue
     239             :  * @param msg           pointer to structure where message is to be received
     240             :  * @param cond          flag that indicates if request should block or not
     241             :  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
     242             :  * @return              success status
     243             :  */
     244             : int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
     245             :                    svm_q_conditional_wait_t cond, u32 time);
     246             : 
     247             : /**
     248             :  * Consumer dequeue one message from queue
     249             :  *
     250             :  * Returns the message pointing to the data in the message rings. Should only
     251             :  * be used in single consumer scenarios as no locks are grabbed. The consumer
     252             :  * is expected to call @ref svm_msg_q_free_msg once it finishes
     253             :  * processing/copies the message data.
     254             :  *
     255             :  * @param mq            message queue
     256             :  * @param msg           pointer to structure where message is to be received
     257             :  * @return              success status
     258             :  */
     259             : int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
     260             : 
     261             : /**
     262             :  * Consumer dequeue multiple messages from queue
     263             :  *
     264             :  * Returns the message pointing to the data in the message rings. Should only
     265             :  * be used in single consumer scenarios as no locks are grabbed. The consumer
     266             :  * is expected to call @ref svm_msg_q_free_msg once it finishes
     267             :  * processing/copies the message data.
     268             :  *
     269             :  * @param mq            message queue
     270             :  * @param msg_buf       pointer to array of messages to received
     271             :  * @param n_msgs        lengt of msg_buf array
     272             :  * @return              number of messages dequeued
     273             :  */
     274             : int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
     275             :                              u32 n_msgs);
     276             : 
     277             : /**
     278             :  * Get data for message in queue
     279             :  *
     280             :  * @param mq            message queue
     281             :  * @param msg           message for which the data is requested
     282             :  * @return              pointer to data
     283             :  */
     284             : void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
     285             : 
     286             : /**
     287             :  * Get message queue ring
     288             :  *
     289             :  * @param mq            message queue
     290             :  * @param ring_index    index of ring
     291             :  * @return              pointer to ring
     292             :  */
     293             : svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
     294             : 
     295             : /**
     296             :  * Set event fd for queue
     297             :  *
     298             :  * If set, queue will exclusively use eventfds for signaling. Moreover,
     299             :  * afterwards, the queue should only be used in non-blocking mode. Waiting
     300             :  * for events should be done externally using something like epoll.
     301             :  *
     302             :  * @param mq            message queue
     303             :  * @param fd            consumer eventfd
     304             :  */
     305             : void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
     306             : 
     307             : /**
     308             :  * Allocate event fd for queue
     309             :  */
     310             : int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
     311             : 
     312             : /**
     313             :  * Format message queue, shows msg count for each ring
     314             :  */
     315             : u8 *format_svm_msg_q (u8 *s, va_list *args);
     316             : 
     317             : /**
     318             :  * Check length of message queue
     319             :  */
     320             : static inline u32
     321   227916967 : svm_msg_q_size (svm_msg_q_t *mq)
     322             : {
     323   227916967 :   return clib_atomic_load_relax_n (&mq->q.shr->cursize);
     324             : }
     325             : 
     326             : /**
     327             :  * Check if message queue is full
     328             :  */
     329             : static inline u8
     330      369297 : svm_msg_q_is_full (svm_msg_q_t * mq)
     331             : {
     332      369297 :   return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
     333             : }
     334             : 
     335             : static inline u8
     336      369290 : svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
     337             : {
     338      369290 :   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
     339      369290 :   return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
     340             : }
     341             : 
     342             : static inline u8
     343      369284 : svm_msg_q_or_ring_is_full (svm_msg_q_t *mq, u32 ring_index)
     344             : {
     345      369284 :   return (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index));
     346             : }
     347             : 
     348             : /**
     349             :  * Check if message queue is empty
     350             :  */
     351             : static inline u8
     352   153617986 : svm_msg_q_is_empty (svm_msg_q_t * mq)
     353             : {
     354   153617986 :   return (svm_msg_q_size (mq) == 0);
     355             : }
     356             : 
     357             : /**
     358             :  * Check if message is invalid
     359             :  */
     360             : static inline u8
     361             : svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
     362             : {
     363             :   return (msg->as_u64 == (u64) ~ 0);
     364             : }
     365             : 
     366             : /**
     367             :  * Try locking message queue
     368             :  */
     369             : static inline int
     370      188334 : svm_msg_q_try_lock (svm_msg_q_t * mq)
     371             : {
     372      188334 :   if (mq->q.evtfd == -1)
     373             :     {
     374      188334 :       int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
     375      188334 :       if (PREDICT_FALSE (rv == EOWNERDEAD))
     376           0 :         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
     377      188334 :       return rv;
     378             :     }
     379             :   else
     380             :     {
     381           0 :       return !clib_spinlock_trylock (&mq->q.lock);
     382             :     }
     383             : }
     384             : 
     385             : /**
     386             :  * Lock, or block trying, the message queue
     387             :  */
     388             : static inline int
     389      167689 : svm_msg_q_lock (svm_msg_q_t * mq)
     390             : {
     391      167689 :   if (mq->q.evtfd == -1)
     392             :     {
     393      167658 :       int rv = pthread_mutex_lock (&mq->q.shr->mutex);
     394      167658 :       if (PREDICT_FALSE (rv == EOWNERDEAD))
     395           0 :         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
     396      167658 :       return rv;
     397             :     }
     398             :   else
     399             :     {
     400          31 :       clib_spinlock_lock (&mq->q.lock);
     401          31 :       return 0;
     402             :     }
     403             : }
     404             : 
     405             : /**
     406             :  * Unlock message queue
     407             :  */
     408             : static inline void
     409      357047 : svm_msg_q_unlock (svm_msg_q_t * mq)
     410             : {
     411      357047 :   if (mq->q.evtfd == -1)
     412             :     {
     413      357016 :       pthread_mutex_unlock (&mq->q.shr->mutex);
     414             :     }
     415             :   else
     416             :     {
     417          31 :       clib_spinlock_unlock (&mq->q.lock);
     418             :     }
     419      357047 : }
     420             : 
     421             : /**
     422             :  * Wait for message queue event
     423             :  *
     424             :  * When eventfds are not configured, the shared memory mutex is locked
     425             :  * before waiting on the condvar. Typically called by consumers.
     426             :  */
     427             : int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
     428             : 
     429             : /**
     430             :  * Wait for message queue event as producer
     431             :  *
     432             :  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
     433             :  * be held. Should only be called by producers.
     434             :  */
     435             : int svm_msg_q_wait_prod (svm_msg_q_t *mq);
     436             : 
     437             : /**
     438             :  * Wait for message queue or ring event as producer
     439             :  *
     440             :  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
     441             :  * be held. Should only be called by producers.
     442             :  */
     443             : int svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index);
     444             : 
     445             : /**
     446             :  * Timed wait for message queue event
     447             :  *
     448             :  * Must be called with mutex held.
     449             :  *
     450             :  * @param mq            message queue
     451             :  * @param timeout       time in seconds
     452             :  */
     453             : int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
     454             : 
     455             : static inline int
     456          32 : svm_msg_q_get_eventfd (svm_msg_q_t *mq)
     457             : {
     458          32 :   return mq->q.evtfd;
     459             : }
     460             : 
     461             : #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
     462             : 
     463             : /*
     464             :  * fd.io coding-style-patch-verification: ON
     465             :  *
     466             :  * Local Variables:
     467             :  * eval: (c-set-style "gnu")
     468             :  * End:
     469             :  */

Generated by: LCOV version 1.14