LCOV - code coverage report
Current view: top level - svm - message_queue.h (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 30 33 90.9 %
Date: 2023-07-05 22:20:52 Functions: 9 9 100.0 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
       3             :  * Licensed under the Apache License, Version 2.0 (the "License");
       4             :  * you may not use this file except in compliance with the License.
       5             :  * You may obtain a copy of the License at:
       6             :  *
       7             :  *     http://www.apache.org/licenses/LICENSE-2.0
       8             :  *
       9             :  * Unless required by applicable law or agreed to in writing, software
      10             :  * distributed under the License is distributed on an "AS IS" BASIS,
      11             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             :  * See the License for the specific language governing permissions and
      13             :  * limitations under the License.
      14             :  */
      15             : /**
      16             :  * @file
      17             :  * @brief Unidirectional shared-memory multi-ring message queue
      18             :  */
      19             : 
      20             : #ifndef SRC_SVM_MESSAGE_QUEUE_H_
      21             : #define SRC_SVM_MESSAGE_QUEUE_H_
      22             : 
      23             : #include <vppinfra/clib.h>
      24             : #include <vppinfra/error.h>
      25             : #include <vppinfra/lock.h>
      26             : #include <svm/queue.h>
      27             : 
      28             : typedef struct svm_msg_q_shr_queue_
      29             : {
      30             :   pthread_mutex_t mutex;  /* 8 bytes */
      31             :   pthread_cond_t condvar; /* 8 bytes */
      32             :   u32 head;
      33             :   u32 tail;
      34             :   volatile u32 cursize;
      35             :   u32 maxsize;
      36             :   u32 elsize;
      37             :   u32 pad;
      38             :   u8 data[0];
      39             : } svm_msg_q_shared_queue_t;
      40             : 
      41             : typedef struct svm_msg_q_queue_
      42             : {
      43             :   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
      44             :   int evtfd;                     /**< producer/consumer eventfd */
      45             :   clib_spinlock_t lock;          /**< private lock for multi-producer */
      46             : } svm_msg_q_queue_t;
      47             : 
      48             : typedef struct svm_msg_q_ring_shared_
      49             : {
      50             :   volatile u32 cursize;                 /**< current size of the ring */
      51             :   u32 nitems;                           /**< max size of the ring */
      52             :   volatile u32 head;                    /**< current head (for dequeue) */
      53             :   volatile u32 tail;                    /**< current tail (for enqueue) */
      54             :   u32 elsize;                           /**< size of an element */
      55             :   u8 data[0];                           /**< chunk of memory for msg data */
      56             : } svm_msg_q_ring_shared_t;
      57             : 
      58             : typedef struct svm_msg_q_ring_
      59             : {
      60             :   u32 nitems;                   /**< max size of the ring */
      61             :   u32 elsize;                   /**< size of an element */
      62             :   svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
      63             : } __clib_packed svm_msg_q_ring_t;
      64             : 
      65             : typedef struct svm_msg_q_shared_
      66             : {
      67             :   u32 n_rings;                   /**< number of rings after q */
      68             :   u32 pad;                       /**< 8 byte alignment for q */
      69             :   svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
      70             : } __clib_packed svm_msg_q_shared_t;
      71             : 
      72             : typedef struct svm_msg_q_
      73             : {
      74             :   svm_msg_q_queue_t q;                  /**< queue for exchanging messages */
      75             :   svm_msg_q_ring_t *rings;              /**< rings with message data*/
      76             : } __clib_packed svm_msg_q_t;
      77             : 
      78             : typedef struct svm_msg_q_ring_cfg_
      79             : {
      80             :   u32 nitems;
      81             :   u32 elsize;
      82             :   void *data;
      83             : } svm_msg_q_ring_cfg_t;
      84             : 
      85             : typedef struct svm_msg_q_cfg_
      86             : {
      87             :   int consumer_pid;                     /**< pid of msg consumer */
      88             :   u32 q_nitems;                         /**< msg queue size (not rings) */
      89             :   u32 n_rings;                          /**< number of msg rings */
      90             :   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
      91             : } svm_msg_q_cfg_t;
      92             : 
      93             : typedef union
      94             : {
      95             :   struct
      96             :   {
      97             :     u32 ring_index;                     /**< ring index, could be u8 */
      98             :     u32 elt_index;                      /**< index in ring */
      99             :   };
     100             :   u64 as_u64;
     101             : } svm_msg_q_msg_t;
     102             : 
     103             : #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
     104             : 
     105             : typedef enum svm_msg_q_wait_type_
     106             : {
     107             :   SVM_MQ_WAIT_EMPTY,
     108             :   SVM_MQ_WAIT_FULL
     109             : } svm_msg_q_wait_type_t;
     110             : 
     111             : /**
     112             :  * Allocate message queue
     113             :  *
     114             :  * Allocates a message queue on the heap. Based on the configuration options,
     115             :  * apart from the message queue this also allocates (one or multiple)
     116             :  * shared-memory rings for the messages.
     117             :  *
     118             :  * @param cfg           configuration options: queue len, consumer pid,
     119             :  *                      ring configs
     120             :  * @return              message queue
     121             :  */
     122             : svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
     123             : svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
     124             : uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
     125             : 
     126             : void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
     127             : 
     128             : /**
     129             :  * Cleanup mq's private data
     130             :  */
     131             : void svm_msg_q_cleanup (svm_msg_q_t *mq);
     132             : 
     133             : /**
     134             :  * Free message queue
     135             :  *
     136             :  * @param mq            message queue to be freed
     137             :  */
     138             : void svm_msg_q_free (svm_msg_q_t * mq);
     139             : 
     140             : /**
     141             :  * Allocate message buffer
     142             :  *
     143             :  * Message is allocated on the first available ring capable of holding
     144             :  * the requested number of bytes.
     145             :  *
     146             :  * @param mq            message queue
     147             :  * @param nbytes        number of bytes needed for message
     148             :  * @return              message structure pointing to the ring and position
     149             :  *                      allocated
     150             :  */
     151             : svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
     152             : 
     153             : /**
     154             :  * Allocate message buffer on ring
     155             :  *
     156             :  * Message is allocated, on requested ring. The caller MUST check that
     157             :  * the ring is not full.
     158             :  *
     159             :  * @param mq            message queue
     160             :  * @param ring_index    ring on which the allocation should occur
     161             :  * @return              message structure pointing to the ring and position
     162             :  *                      allocated
     163             :  */
     164             : svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
     165             : 
     166             : /**
     167             :  * Lock message queue and allocate message buffer on ring
     168             :  *
     169             :  * This should be used when multiple writers/readers are expected to
     170             :  * compete for the rings/queue. Message should be enqueued by calling
     171             :  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
     172             :  * the message in enqueued.
     173             :  *
     174             :  * @param mq            message queue
     175             :  * @param ring_index    ring on which the allocation should occur
     176             :  * @param noblock       flag that indicates if request should block
     177             :  * @param msg           pointer to message to be filled in
     178             :  * @return              0 on success, negative number otherwise
     179             :  */
     180             : int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
     181             :                                          u8 noblock, svm_msg_q_msg_t * msg);
     182             : 
     183             : /**
     184             :  * Free message buffer
     185             :  *
     186             :  * Marks message buffer on ring as free.
     187             :  *
     188             :  * @param mq            message queue
     189             :  * @param msg           message to be freed
     190             :  */
     191             : void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
     192             : 
     193             : /**
     194             :  * Producer enqueue one message to queue
     195             :  *
     196             :  * Prior to calling this, the producer should've obtained a message buffer
     197             :  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
     198             :  *
     199             :  * @param mq            message queue
     200             :  * @param msg           message (pointer to ring position) to be enqueued
     201             :  * @param nowait        flag to indicate if request is blocking or not
     202             :  * @return              success status
     203             :  */
     204             : int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
     205             : 
     206             : /**
     207             :  * Producer enqueue one message to queue with mutex held
     208             :  *
     209             :  * Prior to calling this, the producer should've obtained a message buffer
     210             :  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
     211             :  * the queue mutex is held.
     212             :  *
     213             :  * @param mq            message queue
     214             :  * @param msg           message (pointer to ring position) to be enqueued
     215             :  * @return              success status
     216             :  */
     217             : void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
     218             : 
     219             : /**
     220             :  * Consumer dequeue one message from queue
     221             :  *
     222             :  * This returns the message pointing to the data in the message rings.
     223             :  * Should only be used in single consumer scenarios as no locks are grabbed.
     224             :  * The consumer is expected to call @ref svm_msg_q_free_msg once it
     225             :  * finishes processing/copies the message data.
     226             :  *
     227             :  * @param mq            message queue
     228             :  * @param msg           pointer to structure where message is to be received
     229             :  * @param cond          flag that indicates if request should block or not
     230             :  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
     231             :  * @return              success status
     232             :  */
     233             : int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
     234             :                    svm_q_conditional_wait_t cond, u32 time);
     235             : 
     236             : /**
     237             :  * Consumer dequeue one message from queue
     238             :  *
     239             :  * Returns the message pointing to the data in the message rings. Should only
     240             :  * be used in single consumer scenarios as no locks are grabbed. The consumer
     241             :  * is expected to call @ref svm_msg_q_free_msg once it finishes
     242             :  * processing/copies the message data.
     243             :  *
     244             :  * @param mq            message queue
     245             :  * @param msg           pointer to structure where message is to be received
     246             :  * @return              success status
     247             :  */
     248             : int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
     249             : 
     250             : /**
     251             :  * Consumer dequeue multiple messages from queue
     252             :  *
     253             :  * Returns the message pointing to the data in the message rings. Should only
     254             :  * be used in single consumer scenarios as no locks are grabbed. The consumer
     255             :  * is expected to call @ref svm_msg_q_free_msg once it finishes
     256             :  * processing/copies the message data.
     257             :  *
     258             :  * @param mq            message queue
     259             :  * @param msg_buf       pointer to array of messages to received
     260             :  * @param n_msgs        lengt of msg_buf array
     261             :  * @return              number of messages dequeued
     262             :  */
     263             : int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
     264             :                              u32 n_msgs);
     265             : 
     266             : /**
     267             :  * Get data for message in queue
     268             :  *
     269             :  * @param mq            message queue
     270             :  * @param msg           message for which the data is requested
     271             :  * @return              pointer to data
     272             :  */
     273             : void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
     274             : 
     275             : /**
     276             :  * Get message queue ring
     277             :  *
     278             :  * @param mq            message queue
     279             :  * @param ring_index    index of ring
     280             :  * @return              pointer to ring
     281             :  */
     282             : svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
     283             : 
     284             : /**
     285             :  * Set event fd for queue
     286             :  *
     287             :  * If set, queue will exclusively use eventfds for signaling. Moreover,
     288             :  * afterwards, the queue should only be used in non-blocking mode. Waiting
     289             :  * for events should be done externally using something like epoll.
     290             :  *
     291             :  * @param mq            message queue
     292             :  * @param fd            consumer eventfd
     293             :  */
     294             : void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
     295             : 
     296             : /**
     297             :  * Allocate event fd for queue
     298             :  */
     299             : int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
     300             : 
     301             : /**
     302             :  * Format message queue, shows msg count for each ring
     303             :  */
     304             : u8 *format_svm_msg_q (u8 *s, va_list *args);
     305             : 
     306             : /**
     307             :  * Check length of message queue
     308             :  */
     309             : static inline u32
     310   224722144 : svm_msg_q_size (svm_msg_q_t *mq)
     311             : {
     312   224722144 :   return clib_atomic_load_relax_n (&mq->q.shr->cursize);
     313             : }
     314             : 
     315             : /**
     316             :  * Check if message queue is full
     317             :  */
     318             : static inline u8
     319      266753 : svm_msg_q_is_full (svm_msg_q_t * mq)
     320             : {
     321      266753 :   return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
     322             : }
     323             : 
     324             : static inline u8
     325      266746 : svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
     326             : {
     327      266746 :   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
     328      266746 :   return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
     329             : }
     330             : 
     331             : static inline u8
     332      266740 : svm_msg_q_or_ring_is_full (svm_msg_q_t *mq, u32 ring_index)
     333             : {
     334      266740 :   return (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index));
     335             : }
     336             : 
     337             : /**
     338             :  * Check if message queue is empty
     339             :  */
     340             : static inline u8
     341   149165924 : svm_msg_q_is_empty (svm_msg_q_t * mq)
     342             : {
     343   149165924 :   return (svm_msg_q_size (mq) == 0);
     344             : }
     345             : 
     346             : /**
     347             :  * Check if message is invalid
     348             :  */
     349             : static inline u8
     350             : svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
     351             : {
     352             :   return (msg->as_u64 == (u64) ~ 0);
     353             : }
     354             : 
     355             : /**
     356             :  * Try locking message queue
     357             :  */
     358             : static inline int
     359      116733 : svm_msg_q_try_lock (svm_msg_q_t * mq)
     360             : {
     361      116733 :   if (mq->q.evtfd == -1)
     362             :     {
     363      116733 :       int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
     364      116733 :       if (PREDICT_FALSE (rv == EOWNERDEAD))
     365           0 :         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
     366      116733 :       return rv;
     367             :     }
     368             :   else
     369             :     {
     370           0 :       return !clib_spinlock_trylock (&mq->q.lock);
     371             :     }
     372             : }
     373             : 
     374             : /**
     375             :  * Lock, or block trying, the message queue
     376             :  */
     377             : static inline int
     378      150079 : svm_msg_q_lock (svm_msg_q_t * mq)
     379             : {
     380      150079 :   if (mq->q.evtfd == -1)
     381             :     {
     382      150051 :       int rv = pthread_mutex_lock (&mq->q.shr->mutex);
     383      150051 :       if (PREDICT_FALSE (rv == EOWNERDEAD))
     384           0 :         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
     385      150051 :       return rv;
     386             :     }
     387             :   else
     388             :     {
     389          28 :       clib_spinlock_lock (&mq->q.lock);
     390          28 :       return 0;
     391             :     }
     392             : }
     393             : 
     394             : /**
     395             :  * Unlock message queue
     396             :  */
     397             : static inline void
     398      267783 : svm_msg_q_unlock (svm_msg_q_t * mq)
     399             : {
     400      267783 :   if (mq->q.evtfd == -1)
     401             :     {
     402      267755 :       pthread_mutex_unlock (&mq->q.shr->mutex);
     403             :     }
     404             :   else
     405             :     {
     406          28 :       clib_spinlock_unlock (&mq->q.lock);
     407             :     }
     408      267783 : }
     409             : 
     410             : /**
     411             :  * Wait for message queue event
     412             :  *
     413             :  * When eventfds are not configured, the shared memory mutex is locked
     414             :  * before waiting on the condvar. Typically called by consumers.
     415             :  */
     416             : int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
     417             : 
     418             : /**
     419             :  * Wait for message queue event as producer
     420             :  *
     421             :  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
     422             :  * be held. Should only be called by producers.
     423             :  */
     424             : int svm_msg_q_wait_prod (svm_msg_q_t *mq);
     425             : 
     426             : /**
     427             :  * Wait for message queue or ring event as producer
     428             :  *
     429             :  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
     430             :  * be held. Should only be called by producers.
     431             :  */
     432             : int svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index);
     433             : 
     434             : /**
     435             :  * Timed wait for message queue event
     436             :  *
     437             :  * Must be called with mutex held.
     438             :  *
     439             :  * @param mq            message queue
     440             :  * @param timeout       time in seconds
     441             :  */
     442             : int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
     443             : 
     444             : static inline int
     445          30 : svm_msg_q_get_eventfd (svm_msg_q_t *mq)
     446             : {
     447          30 :   return mq->q.evtfd;
     448             : }
     449             : 
     450             : #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
     451             : 
     452             : /*
     453             :  * fd.io coding-style-patch-verification: ON
     454             :  *
     455             :  * Local Variables:
     456             :  * eval: (c-set-style "gnu")
     457             :  * End:
     458             :  */

Generated by: LCOV version 1.14