LCOV - code coverage report
Current view: top level - vnet/session - session_node.c (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 702 1096 64.1 %
Date: 2023-07-05 22:20:52 Functions: 51 65 78.5 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
       3             :  * Licensed under the Apache License, Version 2.0 (the "License");
       4             :  * you may not use this file except in compliance with the License.
       5             :  * You may obtain a copy of the License at:
       6             :  *
       7             :  *     http://www.apache.org/licenses/LICENSE-2.0
       8             :  *
       9             :  * Unless required by applicable law or agreed to in writing, software
      10             :  * distributed under the License is distributed on an "AS IS" BASIS,
      11             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             :  * See the License for the specific language governing permissions and
      13             :  * limitations under the License.
      14             :  */
      15             : 
      16             : #include <math.h>
      17             : #include <vlib/vlib.h>
      18             : #include <vnet/vnet.h>
      19             : #include <vppinfra/elog.h>
      20             : #include <vnet/session/transport.h>
      21             : #include <vnet/session/session.h>
      22             : #include <vnet/session/application.h>
      23             : #include <vnet/session/application_interface.h>
      24             : #include <vnet/session/application_local.h>
      25             : #include <vnet/session/session_debug.h>
      26             : #include <svm/queue.h>
      27             : #include <sys/timerfd.h>
      28             : 
      29             : static inline void
      30           4 : session_wrk_send_evt_to_main (session_worker_t *wrk, session_evt_elt_t *elt)
      31             : {
      32             :   session_evt_elt_t *he;
      33             :   uword thread_index;
      34             :   u8 is_empty;
      35             : 
      36           4 :   thread_index = wrk->vm->thread_index;
      37           4 :   he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
      38           4 :   is_empty = clib_llist_is_empty (wrk->event_elts, evt_list, he);
      39           4 :   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
      40           4 :   if (is_empty)
      41           4 :     session_send_rpc_evt_to_thread (0, session_wrk_handle_evts_main_rpc,
      42             :                                     uword_to_pointer (thread_index, void *));
      43           4 : }
      44             : 
      45             : #define app_check_thread_and_barrier(_wrk, _elt)                              \
      46             :   if (!vlib_thread_is_main_w_barrier ())                                      \
      47             :     {                                                                         \
      48             :       session_wrk_send_evt_to_main (wrk, elt);                                \
      49             :       return;                                                                 \
      50             :     }
      51             : 
      52             : static void
      53          30 : session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
      54             : {
      55             :   struct itimerspec its;
      56             : 
      57          30 :   its.it_value.tv_sec = 0;
      58          30 :   its.it_value.tv_nsec = time_ns;
      59          30 :   its.it_interval.tv_sec = 0;
      60          30 :   its.it_interval.tv_nsec = its.it_value.tv_nsec;
      61             : 
      62          30 :   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
      63           0 :     clib_warning ("timerfd_settime");
      64          30 : }
      65             : 
      66             : always_inline u64
      67          30 : session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
      68             : {
      69          30 :   if (state == SESSION_WRK_INTERRUPT)
      70          16 :     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
      71          14 :   else if (state == SESSION_WRK_IDLE)
      72           4 :     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
      73             :   else
      74          10 :     return 0;
      75             : }
      76             : 
      77             : static inline void
      78          30 : session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
      79             : {
      80             :   u64 time_ns;
      81             : 
      82          30 :   wrk->state = state;
      83          30 :   if (wrk->timerfd == -1)
      84           0 :     return;
      85          30 :   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
      86          30 :   session_wrk_timerfd_update (wrk, time_ns);
      87             : }
      88             : 
      89             : static transport_endpt_ext_cfg_t *
      90          12 : session_mq_get_ext_config (application_t *app, uword offset)
      91             : {
      92             :   svm_fifo_chunk_t *c;
      93             :   fifo_segment_t *fs;
      94             : 
      95          12 :   fs = application_get_rx_mqs_segment (app);
      96          12 :   c = fs_chunk_ptr (fs->h, offset);
      97          12 :   return (transport_endpt_ext_cfg_t *) c->data;
      98             : }
      99             : 
     100             : static void
     101          12 : session_mq_free_ext_config (application_t *app, uword offset)
     102             : {
     103             :   svm_fifo_chunk_t *c;
     104             :   fifo_segment_t *fs;
     105             : 
     106          12 :   fs = application_get_rx_mqs_segment (app);
     107          12 :   c = fs_chunk_ptr (fs->h, offset);
     108          12 :   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
     109          12 : }
     110             : 
     111             : static void
     112          45 : session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     113             : {
     114          45 :   vnet_listen_args_t _a, *a = &_a;
     115             :   session_listen_msg_t *mp;
     116             :   app_worker_t *app_wrk;
     117             :   application_t *app;
     118             :   int rv;
     119             : 
     120          45 :   app_check_thread_and_barrier (wrk, elt);
     121             : 
     122          43 :   mp = session_evt_ctrl_data (wrk, elt);
     123          43 :   app = application_lookup (mp->client_index);
     124          43 :   if (!app)
     125           0 :     return;
     126             : 
     127          43 :   clib_memset (a, 0, sizeof (*a));
     128          43 :   a->sep.is_ip4 = mp->is_ip4;
     129          43 :   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
     130          43 :   a->sep.port = mp->port;
     131          43 :   a->sep.fib_index = mp->vrf;
     132          43 :   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
     133          43 :   a->sep.transport_proto = mp->proto;
     134          43 :   a->app_index = app->app_index;
     135          43 :   a->wrk_map_index = mp->wrk_index;
     136          43 :   a->sep_ext.transport_flags = mp->flags;
     137             : 
     138          43 :   if (mp->ext_config)
     139           5 :     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
     140             : 
     141          43 :   if ((rv = vnet_listen (a)))
     142           0 :     session_worker_stat_error_inc (wrk, rv, 1);
     143             : 
     144          43 :   app_wrk = application_get_worker (app, mp->wrk_index);
     145          43 :   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
     146             : 
     147          43 :   if (mp->ext_config)
     148           5 :     session_mq_free_ext_config (app, mp->ext_config);
     149             : }
     150             : 
     151             : static void
     152           0 : session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     153             : {
     154           0 :   vnet_listen_args_t _a, *a = &_a;
     155             :   session_listen_uri_msg_t *mp;
     156             :   app_worker_t *app_wrk;
     157             :   application_t *app;
     158             :   int rv;
     159             : 
     160           0 :   app_check_thread_and_barrier (wrk, elt);
     161             : 
     162           0 :   mp = session_evt_ctrl_data (wrk, elt);
     163           0 :   app = application_lookup (mp->client_index);
     164           0 :   if (!app)
     165           0 :     return;
     166             : 
     167           0 :   clib_memset (a, 0, sizeof (*a));
     168           0 :   a->uri = (char *) mp->uri;
     169           0 :   a->app_index = app->app_index;
     170           0 :   rv = vnet_bind_uri (a);
     171             : 
     172           0 :   app_wrk = application_get_worker (app, 0);
     173           0 :   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
     174             : }
     175             : 
     176             : static void
     177          51 : session_mq_connect_one (session_connect_msg_t *mp)
     178             : {
     179          51 :   vnet_connect_args_t _a, *a = &_a;
     180             :   app_worker_t *app_wrk;
     181             :   session_worker_t *wrk;
     182             :   application_t *app;
     183             :   int rv;
     184             : 
     185          51 :   app = application_lookup (mp->client_index);
     186          51 :   if (!app)
     187           0 :     return;
     188             : 
     189          51 :   clib_memset (a, 0, sizeof (*a));
     190          51 :   a->sep.is_ip4 = mp->is_ip4;
     191          51 :   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
     192          51 :   a->sep.port = mp->port;
     193          51 :   a->sep.transport_proto = mp->proto;
     194          51 :   a->sep.peer.fib_index = mp->vrf;
     195          51 :   a->sep.dscp = mp->dscp;
     196          51 :   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
     197          51 :   if (mp->is_ip4)
     198             :     {
     199          47 :       ip46_address_mask_ip4 (&a->sep.ip);
     200          47 :       ip46_address_mask_ip4 (&a->sep.peer.ip);
     201             :     }
     202          51 :   a->sep.peer.port = mp->lcl_port;
     203          51 :   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
     204          51 :   a->sep_ext.parent_handle = mp->parent_handle;
     205          51 :   a->sep_ext.transport_flags = mp->flags;
     206          51 :   a->api_context = mp->context;
     207          51 :   a->app_index = app->app_index;
     208          51 :   a->wrk_map_index = mp->wrk_index;
     209             : 
     210          51 :   if (mp->ext_config)
     211           7 :     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
     212             : 
     213          51 :   if ((rv = vnet_connect (a)))
     214             :     {
     215           0 :       wrk = session_main_get_worker (vlib_get_thread_index ());
     216           0 :       session_worker_stat_error_inc (wrk, rv, 1);
     217           0 :       app_wrk = application_get_worker (app, mp->wrk_index);
     218           0 :       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
     219             :     }
     220             : 
     221          51 :   if (mp->ext_config)
     222           7 :     session_mq_free_ext_config (app, mp->ext_config);
     223             : }
     224             : 
     225             : static void
     226          51 : session_mq_handle_connects_rpc (void *arg)
     227             : {
     228          51 :   u32 max_connects = 32, n_connects = 0;
     229             :   session_evt_elt_t *he, *elt, *next;
     230             :   session_worker_t *fwrk;
     231             : 
     232          51 :   ASSERT (session_vlib_thread_is_cl_thread ());
     233             : 
     234             :   /* Pending connects on linked list pertaining to first worker */
     235          51 :   fwrk = session_main_get_worker (transport_cl_thread ());
     236          51 :   if (!fwrk->n_pending_connects)
     237           0 :     return;
     238             : 
     239          51 :   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
     240          51 :   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
     241             : 
     242             :   /* Avoid holding the worker for too long */
     243         102 :   while (n_connects < max_connects && elt != he)
     244             :     {
     245          51 :       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
     246          51 :       clib_llist_remove (fwrk->event_elts, evt_list, elt);
     247          51 :       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
     248          51 :       session_evt_ctrl_data_free (fwrk, elt);
     249          51 :       clib_llist_put (fwrk->event_elts, elt);
     250          51 :       elt = next;
     251          51 :       n_connects += 1;
     252             :     }
     253             : 
     254             :   /* Decrement with worker barrier */
     255          51 :   fwrk->n_pending_connects -= n_connects;
     256          51 :   if (fwrk->n_pending_connects > 0)
     257             :     {
     258           0 :       session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index,
     259             :                                             session_mq_handle_connects_rpc, 0);
     260             :     }
     261             : }
     262             : 
     263             : static void
     264          51 : session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     265             : {
     266          51 :   u32 thread_index = wrk - session_main.wrk;
     267             :   session_evt_elt_t *he;
     268             : 
     269          51 :   if (PREDICT_FALSE (thread_index > transport_cl_thread ()))
     270             :     {
     271           0 :       clib_warning ("Connect on wrong thread. Dropping");
     272           0 :       return;
     273             :     }
     274             : 
     275             :   /* If on worker, check if main has any pending messages. Avoids reordering
     276             :    * with other control messages that need to be handled by main
     277             :    */
     278          51 :   if (thread_index)
     279             :     {
     280           4 :       he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
     281             : 
     282             :       /* Events pending on main, postpone to avoid reordering */
     283           4 :       if (!clib_llist_is_empty (wrk->event_elts, evt_list, he))
     284             :         {
     285           0 :           clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
     286           0 :           return;
     287             :         }
     288             :     }
     289             : 
     290             :   /* Add to pending list to be handled by first worker */
     291          51 :   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
     292          51 :   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
     293             : 
     294             :   /* Decremented with worker barrier */
     295          51 :   wrk->n_pending_connects += 1;
     296          51 :   if (wrk->n_pending_connects == 1)
     297             :     {
     298          51 :       session_send_rpc_evt_to_thread_force (thread_index,
     299             :                                             session_mq_handle_connects_rpc, 0);
     300             :     }
     301             : }
     302             : 
     303             : static void
     304           0 : session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     305             : {
     306           0 :   vnet_connect_args_t _a, *a = &_a;
     307             :   session_connect_uri_msg_t *mp;
     308             :   app_worker_t *app_wrk;
     309             :   application_t *app;
     310             :   int rv;
     311             : 
     312           0 :   app_check_thread_and_barrier (wrk, elt);
     313             : 
     314           0 :   mp = session_evt_ctrl_data (wrk, elt);
     315           0 :   app = application_lookup (mp->client_index);
     316           0 :   if (!app)
     317           0 :     return;
     318             : 
     319           0 :   clib_memset (a, 0, sizeof (*a));
     320           0 :   a->uri = (char *) mp->uri;
     321           0 :   a->api_context = mp->context;
     322           0 :   a->app_index = app->app_index;
     323           0 :   if ((rv = vnet_connect_uri (a)))
     324             :     {
     325           0 :       session_worker_stat_error_inc (wrk, rv, 1);
     326           0 :       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
     327           0 :       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
     328             :     }
     329             : }
     330             : 
     331             : static void
     332           0 : session_mq_shutdown_handler (void *data)
     333             : {
     334           0 :   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
     335           0 :   vnet_shutdown_args_t _a, *a = &_a;
     336             :   application_t *app;
     337             : 
     338           0 :   app = application_lookup (mp->client_index);
     339           0 :   if (!app)
     340           0 :     return;
     341             : 
     342           0 :   a->app_index = app->app_index;
     343           0 :   a->handle = mp->handle;
     344           0 :   vnet_shutdown_session (a);
     345             : }
     346             : 
     347             : static void
     348          61 : session_mq_disconnect_handler (void *data)
     349             : {
     350          61 :   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
     351          61 :   vnet_disconnect_args_t _a, *a = &_a;
     352             :   application_t *app;
     353             : 
     354          61 :   app = application_lookup (mp->client_index);
     355          61 :   if (!app)
     356           0 :     return;
     357             : 
     358          61 :   a->app_index = app->app_index;
     359          61 :   a->handle = mp->handle;
     360          61 :   vnet_disconnect_session (a);
     361             : }
     362             : 
     363             : static void
     364          24 : app_mq_detach_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     365             : {
     366          24 :   vnet_app_detach_args_t _a, *a = &_a;
     367             :   session_app_detach_msg_t *mp;
     368             :   application_t *app;
     369             : 
     370          26 :   app_check_thread_and_barrier (wrk, elt);
     371             : 
     372          24 :   mp = session_evt_ctrl_data (wrk, elt);
     373          24 :   app = application_lookup (mp->client_index);
     374          24 :   if (!app)
     375           2 :     return;
     376             : 
     377          22 :   a->app_index = app->app_index;
     378          22 :   a->api_client_index = mp->client_index;
     379          22 :   vnet_application_detach (a);
     380             : }
     381             : 
     382             : static void
     383          27 : session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     384             : {
     385          27 :   vnet_unlisten_args_t _a, *a = &_a;
     386             :   session_unlisten_msg_t *mp;
     387             :   app_worker_t *app_wrk;
     388             :   session_handle_t sh;
     389             :   application_t *app;
     390             :   int rv;
     391             : 
     392          28 :   app_check_thread_and_barrier (wrk, elt);
     393             : 
     394          25 :   mp = session_evt_ctrl_data (wrk, elt);
     395          25 :   sh = mp->handle;
     396             : 
     397          25 :   app = application_lookup (mp->client_index);
     398          25 :   if (!app)
     399           1 :     return;
     400             : 
     401          24 :   clib_memset (a, 0, sizeof (*a));
     402          24 :   a->app_index = app->app_index;
     403          24 :   a->handle = sh;
     404          24 :   a->wrk_map_index = mp->wrk_index;
     405             : 
     406          24 :   if ((rv = vnet_unlisten (a)))
     407           0 :     session_worker_stat_error_inc (wrk, rv, 1);
     408             : 
     409          24 :   app_wrk = application_get_worker (app, a->wrk_map_index);
     410          24 :   if (!app_wrk)
     411           0 :     return;
     412             : 
     413          24 :   mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
     414             : }
     415             : 
     416             : static void
     417          47 : session_mq_accepted_reply_handler (session_worker_t *wrk,
     418             :                                    session_evt_elt_t *elt)
     419             : {
     420          47 :   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
     421             :   session_accepted_reply_msg_t *mp;
     422             :   session_state_t old_state;
     423             :   app_worker_t *app_wrk;
     424             :   session_t *s;
     425             : 
     426          47 :   mp = session_evt_ctrl_data (wrk, elt);
     427             : 
     428             :   /* Mail this back from the main thread. We're not polling in main
     429             :    * thread so we're using other workers for notifications. */
     430          47 :   if (session_thread_from_handle (mp->handle) == 0 && vlib_num_workers () &&
     431           0 :       vlib_get_thread_index () != 0)
     432             :     {
     433           0 :       session_wrk_send_evt_to_main (wrk, elt);
     434          12 :       return;
     435             :     }
     436             : 
     437          47 :   s = session_get_from_handle_if_valid (mp->handle);
     438          47 :   if (!s)
     439           0 :     return;
     440             : 
     441          47 :   app_wrk = app_worker_get (s->app_wrk_index);
     442          47 :   if (app_wrk->app_index != mp->context)
     443             :     {
     444           0 :       clib_warning ("app doesn't own session");
     445           0 :       return;
     446             :     }
     447             : 
     448             :   /* Server isn't interested, disconnect the session */
     449          47 :   if (mp->retval)
     450             :     {
     451           0 :       a->app_index = mp->context;
     452           0 :       a->handle = mp->handle;
     453           0 :       vnet_disconnect_session (a);
     454           0 :       return;
     455             :     }
     456             : 
     457             :   /* Special handling for cut-through sessions */
     458          47 :   if (!session_has_transport (s))
     459             :     {
     460          12 :       session_set_state (s, SESSION_STATE_READY);
     461          12 :       ct_session_connect_notify (s, SESSION_E_NONE);
     462          12 :       return;
     463             :     }
     464             : 
     465          35 :   old_state = s->session_state;
     466          35 :   session_set_state (s, SESSION_STATE_READY);
     467             : 
     468          35 :   if (!svm_fifo_is_empty_prod (s->rx_fifo))
     469           2 :     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
     470             : 
     471             :   /* Closed while waiting for app to reply. Resend disconnect */
     472          35 :   if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
     473             :     {
     474           0 :       app_worker_close_notify (app_wrk, s);
     475           0 :       session_set_state (s, old_state);
     476           0 :       return;
     477             :     }
     478             : }
     479             : 
     480             : static void
     481           4 : session_mq_reset_reply_handler (void *data)
     482             : {
     483           4 :   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
     484             :   session_reset_reply_msg_t *mp;
     485             :   app_worker_t *app_wrk;
     486             :   session_t *s;
     487             :   application_t *app;
     488             :   u32 index, thread_index;
     489             : 
     490           4 :   mp = (session_reset_reply_msg_t *) data;
     491           4 :   app = application_lookup (mp->context);
     492           4 :   if (!app)
     493           0 :     return;
     494             : 
     495           4 :   session_parse_handle (mp->handle, &index, &thread_index);
     496           4 :   s = session_get_if_valid (index, thread_index);
     497             : 
     498             :   /* No session or not the right session */
     499           4 :   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
     500           0 :     return;
     501             : 
     502           4 :   app_wrk = app_worker_get (s->app_wrk_index);
     503           4 :   if (!app_wrk || app_wrk->app_index != app->app_index)
     504             :     {
     505           0 :       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
     506             :                     mp->handle);
     507           0 :       return;
     508             :     }
     509             : 
     510             :   /* Client objected to resetting the session, log and continue */
     511           4 :   if (mp->retval)
     512             :     {
     513           0 :       clib_warning ("client retval %d", mp->retval);
     514           0 :       return;
     515             :     }
     516             : 
     517             :   /* This comes as a response to a reset, transport only waiting for
     518             :    * confirmation to remove connection state, no need to disconnect */
     519           4 :   a->handle = mp->handle;
     520           4 :   a->app_index = app->app_index;
     521           4 :   vnet_disconnect_session (a);
     522             : }
     523             : 
     524             : static void
     525           0 : session_mq_disconnected_handler (void *data)
     526             : {
     527             :   session_disconnected_reply_msg_t *rmp;
     528           0 :   vnet_disconnect_args_t _a, *a = &_a;
     529           0 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     530             :   session_disconnected_msg_t *mp;
     531             :   app_worker_t *app_wrk;
     532             :   session_event_t *evt;
     533             :   session_t *s;
     534             :   application_t *app;
     535           0 :   int rv = 0;
     536             : 
     537           0 :   mp = (session_disconnected_msg_t *) data;
     538           0 :   if (!(s = session_get_from_handle_if_valid (mp->handle)))
     539             :     {
     540           0 :       clib_warning ("could not disconnect handle %llu", mp->handle);
     541           0 :       return;
     542             :     }
     543           0 :   app_wrk = app_worker_get (s->app_wrk_index);
     544           0 :   app = application_lookup (mp->client_index);
     545           0 :   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
     546             :     {
     547           0 :       clib_warning ("could not disconnect session: %llu app: %u",
     548             :                     mp->handle, mp->client_index);
     549           0 :       return;
     550             :     }
     551             : 
     552           0 :   a->handle = mp->handle;
     553           0 :   a->app_index = app_wrk->wrk_index;
     554           0 :   rv = vnet_disconnect_session (a);
     555             : 
     556           0 :   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     557             :                                        SESSION_MQ_CTRL_EVT_RING,
     558             :                                        SVM_Q_WAIT, msg);
     559           0 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     560           0 :   clib_memset (evt, 0, sizeof (*evt));
     561           0 :   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
     562           0 :   rmp = (session_disconnected_reply_msg_t *) evt->data;
     563           0 :   rmp->handle = mp->handle;
     564           0 :   rmp->context = mp->context;
     565           0 :   rmp->retval = rv;
     566           0 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     567             : }
     568             : 
     569             : static void
     570          13 : session_mq_disconnected_reply_handler (void *data)
     571             : {
     572             :   session_disconnected_reply_msg_t *mp;
     573          13 :   vnet_disconnect_args_t _a, *a = &_a;
     574             :   application_t *app;
     575             : 
     576          13 :   mp = (session_disconnected_reply_msg_t *) data;
     577             : 
     578             :   /* Client objected to disconnecting the session, log and continue */
     579          13 :   if (mp->retval)
     580             :     {
     581           0 :       clib_warning ("client retval %d", mp->retval);
     582           0 :       return;
     583             :     }
     584             : 
     585             :   /* Disconnect has been confirmed. Confirm close to transport */
     586          13 :   app = application_lookup (mp->context);
     587          13 :   if (app)
     588             :     {
     589          13 :       a->handle = mp->handle;
     590          13 :       a->app_index = app->app_index;
     591          13 :       vnet_disconnect_session (a);
     592             :     }
     593             : }
     594             : 
     595             : static void
     596           0 : session_mq_worker_update_handler (void *data)
     597             : {
     598           0 :   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
     599             :   session_worker_update_reply_msg_t *rmp;
     600           0 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     601             :   app_worker_t *app_wrk;
     602             :   u32 owner_app_wrk_map;
     603             :   session_event_t *evt;
     604             :   session_t *s;
     605             :   application_t *app;
     606             :   int rv;
     607             : 
     608           0 :   app = application_lookup (mp->client_index);
     609           0 :   if (!app)
     610           0 :     return;
     611           0 :   if (!(s = session_get_from_handle_if_valid (mp->handle)))
     612             :     {
     613           0 :       clib_warning ("invalid handle %llu", mp->handle);
     614           0 :       return;
     615             :     }
     616           0 :   app_wrk = app_worker_get (s->app_wrk_index);
     617           0 :   if (app_wrk->app_index != app->app_index)
     618             :     {
     619           0 :       clib_warning ("app %u does not own session %llu", app->app_index,
     620             :                     mp->handle);
     621           0 :       return;
     622             :     }
     623           0 :   owner_app_wrk_map = app_wrk->wrk_map_index;
     624           0 :   app_wrk = application_get_worker (app, mp->wrk_index);
     625             : 
     626             :   /* This needs to come from the new owner */
     627           0 :   if (mp->req_wrk_index == owner_app_wrk_map)
     628             :     {
     629             :       session_req_worker_update_msg_t *wump;
     630             : 
     631           0 :       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     632             :                                            SESSION_MQ_CTRL_EVT_RING,
     633             :                                            SVM_Q_WAIT, msg);
     634           0 :       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     635           0 :       clib_memset (evt, 0, sizeof (*evt));
     636           0 :       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
     637           0 :       wump = (session_req_worker_update_msg_t *) evt->data;
     638           0 :       wump->session_handle = mp->handle;
     639           0 :       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     640           0 :       return;
     641             :     }
     642             : 
     643           0 :   rv = app_worker_own_session (app_wrk, s);
     644           0 :   if (rv)
     645           0 :     session_stat_error_inc (rv, 1);
     646             : 
     647             :   /*
     648             :    * Send reply
     649             :    */
     650           0 :   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     651             :                                        SESSION_MQ_CTRL_EVT_RING,
     652             :                                        SVM_Q_WAIT, msg);
     653           0 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     654           0 :   clib_memset (evt, 0, sizeof (*evt));
     655           0 :   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
     656           0 :   rmp = (session_worker_update_reply_msg_t *) evt->data;
     657           0 :   rmp->handle = mp->handle;
     658           0 :   if (s->rx_fifo)
     659           0 :     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
     660           0 :   if (s->tx_fifo)
     661           0 :     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
     662           0 :   rmp->segment_handle = session_segment_handle (s);
     663           0 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     664             : 
     665             :   /*
     666             :    * Retransmit messages that may have been lost
     667             :    */
     668           0 :   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
     669           0 :     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
     670             : 
     671           0 :   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
     672           0 :     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
     673             : 
     674           0 :   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     675           0 :     app_worker_close_notify (app_wrk, s);
     676             : }
     677             : 
     678             : static void
     679           0 : session_mq_app_wrk_rpc_handler (void *data)
     680             : {
     681           0 :   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
     682           0 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     683             :   session_app_wrk_rpc_msg_t *rmp;
     684             :   app_worker_t *app_wrk;
     685             :   session_event_t *evt;
     686             :   application_t *app;
     687             : 
     688           0 :   app = application_lookup (mp->client_index);
     689           0 :   if (!app)
     690           0 :     return;
     691             : 
     692           0 :   app_wrk = application_get_worker (app, mp->wrk_index);
     693             : 
     694           0 :   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     695             :                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
     696             :                                        msg);
     697           0 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     698           0 :   clib_memset (evt, 0, sizeof (*evt));
     699           0 :   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
     700           0 :   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
     701           0 :   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
     702           0 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     703             : }
     704             : 
     705             : static void
     706           5 : session_mq_transport_attr_handler (void *data)
     707             : {
     708           5 :   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
     709             :   session_transport_attr_reply_msg_t *rmp;
     710           5 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     711             :   app_worker_t *app_wrk;
     712             :   session_event_t *evt;
     713             :   application_t *app;
     714             :   session_t *s;
     715             :   int rv;
     716             : 
     717           5 :   app = application_lookup (mp->client_index);
     718           5 :   if (!app)
     719           0 :     return;
     720             : 
     721           5 :   if (!(s = session_get_from_handle_if_valid (mp->handle)))
     722             :     {
     723           0 :       clib_warning ("invalid handle %llu", mp->handle);
     724           0 :       return;
     725             :     }
     726           5 :   app_wrk = app_worker_get (s->app_wrk_index);
     727           5 :   if (app_wrk->app_index != app->app_index)
     728             :     {
     729           0 :       clib_warning ("app %u does not own session %llu", app->app_index,
     730             :                     mp->handle);
     731           0 :       return;
     732             :     }
     733             : 
     734           5 :   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
     735             : 
     736           5 :   svm_msg_q_lock_and_alloc_msg_w_ring (
     737             :     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
     738           5 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     739           5 :   clib_memset (evt, 0, sizeof (*evt));
     740           5 :   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
     741           5 :   rmp = (session_transport_attr_reply_msg_t *) evt->data;
     742           5 :   rmp->handle = mp->handle;
     743           5 :   rmp->retval = rv;
     744           5 :   rmp->is_get = mp->is_get;
     745           5 :   if (!rv && mp->is_get)
     746           3 :     rmp->attr = mp->attr;
     747           5 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     748             : }
     749             : 
     750             : void
     751           4 : session_wrk_handle_evts_main_rpc (void *args)
     752             : {
     753           4 :   vlib_main_t *vm = vlib_get_main ();
     754             :   clib_llist_index_t ei, next_ei;
     755             :   session_evt_elt_t *he, *elt;
     756             :   session_worker_t *fwrk;
     757             :   u32 thread_index;
     758             : 
     759           4 :   vlib_worker_thread_barrier_sync (vm);
     760             : 
     761           4 :   thread_index = pointer_to_uword (args);
     762           4 :   fwrk = session_main_get_worker (thread_index);
     763             : 
     764           4 :   he = clib_llist_elt (fwrk->event_elts, fwrk->evts_pending_main);
     765           4 :   ei = clib_llist_next_index (he, evt_list);
     766             : 
     767           8 :   while (ei != fwrk->evts_pending_main)
     768             :     {
     769           4 :       elt = clib_llist_elt (fwrk->event_elts, ei);
     770           4 :       next_ei = clib_llist_next_index (elt, evt_list);
     771           4 :       clib_llist_remove (fwrk->event_elts, evt_list, elt);
     772           4 :       switch (elt->evt.event_type)
     773             :         {
     774           2 :         case SESSION_CTRL_EVT_LISTEN:
     775           2 :           session_mq_listen_handler (fwrk, elt);
     776           2 :           break;
     777           2 :         case SESSION_CTRL_EVT_UNLISTEN:
     778           2 :           session_mq_unlisten_handler (fwrk, elt);
     779           2 :           break;
     780           0 :         case SESSION_CTRL_EVT_APP_DETACH:
     781           0 :           app_mq_detach_handler (fwrk, elt);
     782           0 :           break;
     783           0 :         case SESSION_CTRL_EVT_CONNECT_URI:
     784           0 :           session_mq_connect_uri_handler (fwrk, elt);
     785           0 :           break;
     786           0 :         case SESSION_CTRL_EVT_ACCEPTED_REPLY:
     787           0 :           session_mq_accepted_reply_handler (fwrk, elt);
     788           0 :           break;
     789           0 :         case SESSION_CTRL_EVT_CONNECT:
     790           0 :           session_mq_connect_handler (fwrk, elt);
     791           0 :           break;
     792           0 :         default:
     793           0 :           clib_warning ("unhandled %u", elt->evt.event_type);
     794           0 :           ALWAYS_ASSERT (0);
     795           0 :           break;
     796             :         }
     797             : 
     798             :       /* Regrab element in case pool moved */
     799           4 :       elt = clib_llist_elt (fwrk->event_elts, ei);
     800           4 :       if (!clib_llist_elt_is_linked (elt, evt_list))
     801             :         {
     802           4 :           session_evt_ctrl_data_free (fwrk, elt);
     803           4 :           clib_llist_put (fwrk->event_elts, elt);
     804             :         }
     805           4 :       ei = next_ei;
     806             :     }
     807             : 
     808           4 :   vlib_worker_thread_barrier_release (vm);
     809           4 : }
     810             : 
     811             : vlib_node_registration_t session_queue_node;
     812             : 
     813             : typedef struct
     814             : {
     815             :   u32 session_index;
     816             :   u32 server_thread_index;
     817             : } session_queue_trace_t;
     818             : 
     819             : /* packet trace format function */
     820             : static u8 *
     821           0 : format_session_queue_trace (u8 * s, va_list * args)
     822             : {
     823           0 :   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
     824           0 :   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
     825           0 :   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
     826             : 
     827           0 :   s = format (s, "session index %d thread index %d",
     828             :               t->session_index, t->server_thread_index);
     829           0 :   return s;
     830             : }
     831             : 
     832             : #define foreach_session_queue_error                                           \
     833             :   _ (TX, tx, INFO, "Packets transmitted")                                     \
     834             :   _ (TIMER, timer, INFO, "Timer events")                                      \
     835             :   _ (NO_BUFFER, no_buffer, ERROR, "Out of buffers")
     836             : 
     837             : typedef enum
     838             : {
     839             : #define _(f, n, s, d) SESSION_QUEUE_ERROR_##f,
     840             :   foreach_session_queue_error
     841             : #undef _
     842             :     SESSION_QUEUE_N_ERROR,
     843             : } session_queue_error_t;
     844             : 
     845             : static vlib_error_desc_t session_error_counters[] = {
     846             : #define _(f, n, s, d) { #n, d, VL_COUNTER_SEVERITY_##s },
     847             :   foreach_session_queue_error
     848             : #undef _
     849             : };
     850             : 
     851             : enum
     852             : {
     853             :   SESSION_TX_NO_BUFFERS = -2,
     854             :   SESSION_TX_NO_DATA,
     855             :   SESSION_TX_OK
     856             : };
     857             : 
     858             : static void
     859           0 : session_tx_trace_frame (vlib_main_t *vm, vlib_node_runtime_t *node,
     860             :                         u32 next_index, vlib_buffer_t **bufs, u16 n_segs,
     861             :                         session_t *s, u32 n_trace)
     862             : {
     863           0 :   vlib_buffer_t **b = bufs;
     864             : 
     865           0 :   while (n_trace && n_segs)
     866             :     {
     867           0 :       if (PREDICT_TRUE (vlib_trace_buffer (vm, node, next_index, b[0],
     868             :                                            1 /* follow_chain */)))
     869             :         {
     870             :           session_queue_trace_t *t =
     871           0 :             vlib_add_trace (vm, node, b[0], sizeof (*t));
     872           0 :           t->session_index = s->session_index;
     873           0 :           t->server_thread_index = s->thread_index;
     874           0 :           n_trace--;
     875             :         }
     876           0 :       b++;
     877           0 :       n_segs--;
     878             :     }
     879           0 :   vlib_set_trace_count (vm, node, n_trace);
     880           0 : }
     881             : 
     882             : always_inline int
     883           0 : session_tx_fill_dma_transfers (session_worker_t *wrk,
     884             :                                session_tx_context_t *ctx, vlib_buffer_t *b)
     885             : {
     886           0 :   vlib_main_t *vm = wrk->vm;
     887             :   u32 len_to_deq;
     888           0 :   u8 *data0 = NULL;
     889             :   int n_bytes_read, len_write;
     890             :   svm_fifo_seg_t data_fs[2];
     891             : 
     892           0 :   u32 n_segs = 2;
     893           0 :   u16 n_transfers = 0;
     894             :   /*
     895             :    * Start with the first buffer in chain
     896             :    */
     897           0 :   b->error = 0;
     898           0 :   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
     899           0 :   b->current_data = 0;
     900           0 :   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
     901           0 :   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
     902             : 
     903           0 :   n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
     904             :                                     data_fs, &n_segs, len_to_deq);
     905             : 
     906           0 :   len_write = n_bytes_read;
     907           0 :   ASSERT (n_bytes_read == len_to_deq);
     908             : 
     909           0 :   while (n_bytes_read)
     910             :     {
     911           0 :       wrk->batch_num++;
     912           0 :       vlib_dma_batch_add (vm, wrk->batch, data0, data_fs[n_transfers].data,
     913             :                           data_fs[n_transfers].len);
     914           0 :       data0 += data_fs[n_transfers].len;
     915           0 :       n_bytes_read -= data_fs[n_transfers].len;
     916           0 :       n_transfers++;
     917             :     }
     918           0 :   return len_write;
     919             : }
     920             : 
     921             : always_inline int
     922           0 : session_tx_fill_dma_transfers_tail (session_worker_t *wrk,
     923             :                                     session_tx_context_t *ctx,
     924             :                                     vlib_buffer_t *b, u32 len_to_deq, u8 *data)
     925             : {
     926           0 :   vlib_main_t *vm = wrk->vm;
     927             :   int n_bytes_read, len_write;
     928             :   svm_fifo_seg_t data_fs[2];
     929           0 :   u32 n_segs = 2;
     930           0 :   u16 n_transfers = 0;
     931             : 
     932           0 :   n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
     933             :                                     data_fs, &n_segs, len_to_deq);
     934             : 
     935           0 :   len_write = n_bytes_read;
     936             : 
     937           0 :   ASSERT (n_bytes_read == len_to_deq);
     938             : 
     939           0 :   while (n_bytes_read)
     940             :     {
     941           0 :       wrk->batch_num++;
     942           0 :       vlib_dma_batch_add (vm, wrk->batch, data, data_fs[n_transfers].data,
     943             :                           data_fs[n_transfers].len);
     944           0 :       data += data_fs[n_transfers].len;
     945           0 :       n_bytes_read -= data_fs[n_transfers].len;
     946           0 :       n_transfers++;
     947             :     }
     948             : 
     949           0 :   return len_write;
     950             : }
     951             : 
     952             : always_inline int
     953      996032 : session_tx_copy_data (session_worker_t *wrk, session_tx_context_t *ctx,
     954             :                       vlib_buffer_t *b, u32 len_to_deq, u8 *data0)
     955             : {
     956             :   int n_bytes_read;
     957      996032 :   if (PREDICT_TRUE (!wrk->dma_enabled))
     958             :     n_bytes_read =
     959      996032 :       svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0);
     960             :   else
     961           0 :     n_bytes_read = session_tx_fill_dma_transfers (wrk, ctx, b);
     962      996032 :   return n_bytes_read;
     963             : }
     964             : 
     965             : always_inline int
     966           0 : session_tx_copy_data_tail (session_worker_t *wrk, session_tx_context_t *ctx,
     967             :                            vlib_buffer_t *b, u32 len_to_deq, u8 *data)
     968             : {
     969             :   int n_bytes_read;
     970           0 :   if (PREDICT_TRUE (!wrk->dma_enabled))
     971             :     n_bytes_read =
     972           0 :       svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data);
     973             :   else
     974             :     n_bytes_read =
     975           0 :       session_tx_fill_dma_transfers_tail (wrk, ctx, b, len_to_deq, data);
     976           0 :   return n_bytes_read;
     977             : }
     978             : 
     979             : always_inline void
     980           0 : session_tx_fifo_chain_tail (session_worker_t *wrk, session_tx_context_t *ctx,
     981             :                             vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
     982             : {
     983           0 :   vlib_main_t *vm = wrk->vm;
     984             :   vlib_buffer_t *chain_b, *prev_b;
     985             :   u32 chain_bi0, to_deq, left_from_seg;
     986             :   int len_to_deq, n_bytes_read;
     987             :   u8 *data, j;
     988             : 
     989           0 :   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
     990           0 :   b->total_length_not_including_first_buffer = 0;
     991             : 
     992           0 :   chain_b = b;
     993           0 :   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
     994             :                             ctx->left_to_snd);
     995           0 :   to_deq = left_from_seg;
     996           0 :   for (j = 1; j < ctx->n_bufs_per_seg; j++)
     997             :     {
     998           0 :       prev_b = chain_b;
     999           0 :       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
    1000             : 
    1001           0 :       *n_bufs -= 1;
    1002           0 :       chain_bi0 = ctx->tx_buffers[*n_bufs];
    1003           0 :       chain_b = vlib_get_buffer (vm, chain_bi0);
    1004           0 :       chain_b->current_data = 0;
    1005           0 :       data = vlib_buffer_get_current (chain_b);
    1006           0 :       if (peek_data)
    1007             :         {
    1008             :           n_bytes_read =
    1009           0 :             session_tx_copy_data_tail (wrk, ctx, b, len_to_deq, data);
    1010           0 :           ctx->sp.tx_offset += n_bytes_read;
    1011             :         }
    1012             :       else
    1013             :         {
    1014           0 :           if (ctx->transport_vft->transport_options.tx_type ==
    1015             :               TRANSPORT_TX_DGRAM)
    1016             :             {
    1017           0 :               svm_fifo_t *f = ctx->s->tx_fifo;
    1018           0 :               session_dgram_hdr_t *hdr = &ctx->hdr;
    1019             :               u16 deq_now;
    1020             :               u32 offset;
    1021             : 
    1022           0 :               deq_now = clib_min (hdr->data_length - hdr->data_offset,
    1023             :                                   len_to_deq);
    1024           0 :               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
    1025           0 :               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
    1026           0 :               ASSERT (n_bytes_read > 0);
    1027             : 
    1028           0 :               hdr->data_offset += n_bytes_read;
    1029           0 :               if (hdr->data_offset == hdr->data_length)
    1030             :                 {
    1031           0 :                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
    1032           0 :                   svm_fifo_dequeue_drop (f, offset);
    1033           0 :                   if (ctx->left_to_snd > n_bytes_read)
    1034           0 :                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
    1035           0 :                                    (u8 *) & ctx->hdr);
    1036             :                 }
    1037           0 :               else if (ctx->left_to_snd == n_bytes_read)
    1038           0 :                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
    1039             :                                          sizeof (session_dgram_pre_hdr_t));
    1040             :             }
    1041             :           else
    1042           0 :             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
    1043             :                                              len_to_deq, data);
    1044             :         }
    1045           0 :       ASSERT (n_bytes_read == len_to_deq);
    1046           0 :       chain_b->current_length = n_bytes_read;
    1047           0 :       b->total_length_not_including_first_buffer += chain_b->current_length;
    1048             : 
    1049             :       /* update previous buffer */
    1050           0 :       prev_b->next_buffer = chain_bi0;
    1051           0 :       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
    1052             : 
    1053             :       /* update current buffer */
    1054           0 :       chain_b->next_buffer = 0;
    1055             : 
    1056           0 :       to_deq -= n_bytes_read;
    1057           0 :       if (to_deq == 0)
    1058           0 :         break;
    1059             :     }
    1060           0 :   ASSERT (to_deq == 0
    1061             :           && b->total_length_not_including_first_buffer == left_from_seg);
    1062           0 :   ctx->left_to_snd -= left_from_seg;
    1063           0 : }
    1064             : 
    1065             : always_inline void
    1066     1054200 : session_tx_fill_buffer (session_worker_t *wrk, session_tx_context_t *ctx,
    1067             :                         vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
    1068             : {
    1069             :   u32 len_to_deq;
    1070             :   u8 *data0;
    1071             :   int n_bytes_read;
    1072             :   /*
    1073             :    * Start with the first buffer in chain
    1074             :    */
    1075     1054200 :   b->error = 0;
    1076     1054200 :   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
    1077     1054200 :   b->current_data = 0;
    1078             : 
    1079     1054200 :   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
    1080     1054200 :   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
    1081             : 
    1082     1054200 :   if (peek_data)
    1083             :     {
    1084      996032 :       n_bytes_read = session_tx_copy_data (wrk, ctx, b, len_to_deq, data0);
    1085      996032 :       ASSERT (n_bytes_read > 0);
    1086             :       /* Keep track of progress locally, transport is also supposed to
    1087             :        * increment it independently when pushing the header */
    1088      996032 :       ctx->sp.tx_offset += n_bytes_read;
    1089             :     }
    1090             :   else
    1091             :     {
    1092       58172 :       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
    1093             :         {
    1094       58172 :           session_dgram_hdr_t *hdr = &ctx->hdr;
    1095       58172 :           svm_fifo_t *f = ctx->s->tx_fifo;
    1096             :           u16 deq_now;
    1097             :           u32 offset;
    1098             : 
    1099       58172 :           ASSERT (hdr->data_length > hdr->data_offset);
    1100       58172 :           deq_now = clib_min (hdr->data_length - hdr->data_offset,
    1101             :                               len_to_deq);
    1102       58172 :           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
    1103       58172 :           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
    1104       58172 :           ASSERT (n_bytes_read > 0);
    1105             : 
    1106       58172 :           if (transport_connection_is_cless (ctx->tc))
    1107             :             {
    1108           0 :               clib_memcpy_fast (data0 - sizeof (session_dgram_hdr_t), hdr,
    1109             :                                 sizeof (*hdr));
    1110             :             }
    1111       58172 :           hdr->data_offset += n_bytes_read;
    1112       58172 :           if (hdr->data_offset == hdr->data_length)
    1113             :             {
    1114       58172 :               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
    1115       58172 :               svm_fifo_dequeue_drop (f, offset);
    1116       58172 :               if (ctx->left_to_snd > n_bytes_read)
    1117       41745 :                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
    1118       41745 :                                (u8 *) & ctx->hdr);
    1119             :             }
    1120           0 :           else if (ctx->left_to_snd == n_bytes_read)
    1121           0 :             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
    1122             :                                      sizeof (session_dgram_pre_hdr_t));
    1123             :         }
    1124             :       else
    1125             :         {
    1126           0 :           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
    1127             :                                            len_to_deq, data0);
    1128           0 :           ASSERT (n_bytes_read > 0);
    1129             :         }
    1130             :     }
    1131             : 
    1132     1054200 :   b->current_length = n_bytes_read;
    1133     1054200 :   ctx->left_to_snd -= n_bytes_read;
    1134             : 
    1135             :   /*
    1136             :    * Fill in the remaining buffers in the chain, if any
    1137             :    */
    1138     1054200 :   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
    1139           0 :     session_tx_fifo_chain_tail (wrk, ctx, b, n_bufs, peek_data);
    1140     1054200 : }
    1141             : 
    1142             : always_inline u8
    1143       88189 : session_tx_not_ready (session_t * s, u8 peek_data)
    1144             : {
    1145       88189 :   if (peek_data)
    1146             :     {
    1147       70963 :       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
    1148       70381 :         return 0;
    1149             :       /* Can retransmit for closed sessions but can't send new data if
    1150             :        * session is not ready or closed */
    1151         582 :       else if (s->session_state < SESSION_STATE_READY)
    1152             :         {
    1153             :           /* Allow accepting session to send custom packets.
    1154             :            * For instance, tcp want to send acks in established, but
    1155             :            * the app has not called accept() yet */
    1156           0 :           if (s->session_state == SESSION_STATE_ACCEPTING &&
    1157           0 :               (s->flags & SESSION_F_CUSTOM_TX))
    1158           0 :             return 0;
    1159           0 :           return 1;
    1160             :         }
    1161         582 :       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
    1162             :         {
    1163             :           /* Allow closed transports to still send custom packets.
    1164             :            * For instance, tcp may want to send acks in time-wait. */
    1165         131 :           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
    1166         131 :               && (s->flags & SESSION_F_CUSTOM_TX))
    1167         129 :             return 0;
    1168           2 :           return 2;
    1169             :         }
    1170             :     }
    1171             :   else
    1172             :     {
    1173       17226 :       if (s->session_state == SESSION_STATE_TRANSPORT_DELETED)
    1174           0 :         return 2;
    1175             :     }
    1176       17677 :   return 0;
    1177             : }
    1178             : 
    1179             : always_inline transport_connection_t *
    1180       88187 : session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
    1181             : {
    1182       88187 :   if (peek_data)
    1183             :     {
    1184       70961 :       return ctx->transport_vft->get_connection (ctx->s->connection_index,
    1185       70961 :                                                  ctx->s->thread_index);
    1186             :     }
    1187             :   else
    1188             :     {
    1189       17226 :       if (ctx->s->session_state == SESSION_STATE_LISTENING)
    1190           0 :         return ctx->transport_vft->get_listener (ctx->s->connection_index);
    1191             :       else
    1192             :         {
    1193       17226 :           return ctx->transport_vft->get_connection (ctx->s->connection_index,
    1194       17226 :                                                      ctx->s->thread_index);
    1195             :         }
    1196             :     }
    1197             : }
    1198             : 
    1199             : always_inline void
    1200       86571 : session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
    1201             :                                u32 max_segs, u8 peek_data)
    1202             : {
    1203             :   u32 n_bytes_per_buf, n_bytes_per_seg;
    1204             : 
    1205       86571 :   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
    1206       86571 :   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
    1207             : 
    1208       86571 :   if (peek_data)
    1209             :     {
    1210             :       /* Offset in rx fifo from where to peek data */
    1211       69345 :       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
    1212             :         {
    1213       27192 :           ctx->max_len_to_snd = 0;
    1214       27192 :           return;
    1215             :         }
    1216       42153 :       ctx->max_dequeue -= ctx->sp.tx_offset;
    1217             :     }
    1218             :   else
    1219             :     {
    1220       17226 :       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
    1221             :         {
    1222             :           u32 len, chain_limit;
    1223             : 
    1224       17226 :           if (ctx->max_dequeue <= sizeof (ctx->hdr))
    1225             :             {
    1226         799 :               ctx->max_len_to_snd = 0;
    1227         799 :               return;
    1228             :             }
    1229             : 
    1230       16427 :           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
    1231       16427 :                          (u8 *) & ctx->hdr);
    1232             :           /* Zero length dgrams not supported */
    1233       16427 :           if (PREDICT_FALSE (ctx->hdr.data_length == 0))
    1234             :             {
    1235           0 :               svm_fifo_dequeue_drop (ctx->s->tx_fifo, sizeof (ctx->hdr));
    1236           0 :               ctx->max_len_to_snd = 0;
    1237           0 :               return;
    1238             :             }
    1239             :           /* We cannot be sure apps have not enqueued incomplete dgrams */
    1240       16427 :           if (PREDICT_FALSE (ctx->max_dequeue <
    1241             :                              ctx->hdr.data_length + sizeof (ctx->hdr)))
    1242             :             {
    1243           0 :               ctx->max_len_to_snd = 0;
    1244           0 :               return;
    1245             :             }
    1246       16427 :           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
    1247       16427 :           len = ctx->hdr.data_length - ctx->hdr.data_offset;
    1248             : 
    1249       16427 :           if (ctx->hdr.gso_size)
    1250             :             {
    1251           0 :               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, ctx->hdr.gso_size);
    1252             :             }
    1253             : 
    1254             :           /* Process multiple dgrams if smaller than min (buf_space, mss).
    1255             :            * This avoids handling multiple dgrams if they require buffer
    1256             :            * chains */
    1257       16427 :           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
    1258             :                                   ctx->sp.snd_mss);
    1259       16427 :           if (ctx->hdr.data_length <= chain_limit)
    1260             :             {
    1261             :               u32 first_dgram_len, dgram_len, offset, max_offset;
    1262             :               session_dgram_hdr_t hdr;
    1263             : 
    1264       16427 :               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
    1265       16427 :               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
    1266       16427 :               first_dgram_len = len;
    1267       16427 :               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
    1268             : 
    1269       58483 :               while (offset < max_offset)
    1270             :                 {
    1271       45556 :                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
    1272             :                                  (u8 *) & hdr);
    1273       45556 :                   dgram_len = hdr.data_length - hdr.data_offset;
    1274       45556 :                   if (offset + sizeof (hdr) + hdr.data_length >
    1275       45556 :                         ctx->max_dequeue ||
    1276             :                       first_dgram_len != dgram_len)
    1277             :                     break;
    1278             :                   /* Assert here to allow test above with zero length dgrams */
    1279       42056 :                   ASSERT (hdr.data_length > hdr.data_offset);
    1280       42056 :                   len += dgram_len;
    1281       42056 :                   offset += sizeof (hdr) + hdr.data_length;
    1282             :                 }
    1283             :             }
    1284             : 
    1285       16427 :           ctx->max_dequeue = len;
    1286             :         }
    1287             :     }
    1288       58580 :   ASSERT (ctx->max_dequeue > 0);
    1289             : 
    1290             :   /* Ensure we're not writing more than transport window allows */
    1291       58580 :   if (ctx->max_dequeue < ctx->sp.snd_space)
    1292             :     {
    1293             :       /* Constrained by tx queue. Try to send only fully formed segments */
    1294       20028 :       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
    1295       20028 :         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
    1296             :         ctx->max_dequeue;
    1297             :       /* TODO Nagle ? */
    1298             :     }
    1299             :   else
    1300             :     {
    1301             :       /* Expectation is that snd_space0 is already a multiple of snd_mss */
    1302       38552 :       ctx->max_len_to_snd = ctx->sp.snd_space;
    1303             :     }
    1304             : 
    1305             :   /* Check if we're tx constrained by the node */
    1306       58580 :   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
    1307       58580 :   if (ctx->n_segs_per_evt > max_segs)
    1308             :     {
    1309         587 :       ctx->n_segs_per_evt = max_segs;
    1310         587 :       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
    1311             :     }
    1312             : 
    1313       58580 :   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
    1314       58580 :   if (ctx->n_segs_per_evt > 1)
    1315             :     {
    1316             :       u32 n_bytes_last_seg, n_bufs_last_seg;
    1317             : 
    1318       29253 :       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
    1319       29253 :       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
    1320       29253 :         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
    1321       29253 :       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
    1322       29253 :       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
    1323       29253 :       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
    1324       29253 :         + n_bufs_last_seg;
    1325             :     }
    1326             :   else
    1327             :     {
    1328       29327 :       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
    1329       29327 :       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
    1330       29327 :       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
    1331             :     }
    1332             : 
    1333       58580 :   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
    1334       58580 :   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
    1335             :                                      n_bytes_per_buf -
    1336             :                                      TRANSPORT_MAX_HDRS_LEN);
    1337             : }
    1338             : 
    1339             : always_inline void
    1340       49347 : session_tx_maybe_reschedule (session_worker_t * wrk,
    1341             :                              session_tx_context_t * ctx,
    1342             :                              session_evt_elt_t * elt)
    1343             : {
    1344       49347 :   session_t *s = ctx->s;
    1345             : 
    1346       49347 :   svm_fifo_unset_event (s->tx_fifo);
    1347       49347 :   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
    1348             :     {
    1349        5830 :       if (svm_fifo_set_event (s->tx_fifo))
    1350        5828 :         session_evt_add_head_old (wrk, elt);
    1351             :     }
    1352             :   else
    1353             :     {
    1354       43517 :       transport_connection_deschedule (ctx->tc);
    1355             :     }
    1356       49347 : }
    1357             : 
    1358             : always_inline void
    1359     1054200 : session_tx_add_pending_buffer (session_worker_t *wrk, u32 bi, u32 next_index)
    1360             : {
    1361     1054200 :   if (PREDICT_TRUE (!wrk->dma_enabled))
    1362             :     {
    1363     1054200 :       vec_add1 (wrk->pending_tx_buffers, bi);
    1364     1054200 :       vec_add1 (wrk->pending_tx_nexts, next_index);
    1365             :     }
    1366             :   else
    1367             :     {
    1368           0 :       session_dma_transfer *dma_transfer = &wrk->dma_trans[wrk->trans_tail];
    1369           0 :       vec_add1 (dma_transfer->pending_tx_buffers, bi);
    1370           0 :       vec_add1 (dma_transfer->pending_tx_nexts, next_index);
    1371             :     }
    1372     1054200 : }
    1373             : 
    1374             : always_inline int
    1375       88189 : session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
    1376             :                                 vlib_node_runtime_t * node,
    1377             :                                 session_evt_elt_t * elt,
    1378             :                                 int *n_tx_packets, u8 peek_data)
    1379             : {
    1380             :   u32 n_trace, n_left, pbi, next_index, max_burst;
    1381       88189 :   session_tx_context_t *ctx = &wrk->ctx;
    1382       88189 :   session_main_t *smm = &session_main;
    1383       88189 :   session_event_t *e = &elt->evt;
    1384       88189 :   vlib_main_t *vm = wrk->vm;
    1385             :   transport_proto_t tp;
    1386             :   vlib_buffer_t *pb;
    1387             :   u16 n_bufs, rv;
    1388             : 
    1389       88189 :   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
    1390             :     {
    1391           2 :       if (rv < 2)
    1392           0 :         session_evt_add_old (wrk, elt);
    1393           2 :       return SESSION_TX_NO_DATA;
    1394             :     }
    1395             : 
    1396       88187 :   next_index = smm->session_type_to_next[ctx->s->session_type];
    1397       88187 :   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
    1398             : 
    1399       88187 :   tp = session_get_transport_proto (ctx->s);
    1400       88187 :   ctx->transport_vft = transport_protocol_get_vft (tp);
    1401       88187 :   ctx->tc = session_tx_get_transport (ctx, peek_data);
    1402             : 
    1403       88187 :   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
    1404             :     {
    1405         459 :       if (ctx->transport_vft->flush_data)
    1406          79 :         ctx->transport_vft->flush_data (ctx->tc);
    1407         459 :       e->event_type = SESSION_IO_EVT_TX;
    1408             :     }
    1409             : 
    1410       88187 :   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
    1411             :     {
    1412             :       u32 n_custom_tx;
    1413       41252 :       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
    1414       41252 :       ctx->sp.max_burst_size = max_burst;
    1415       41252 :       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
    1416       41252 :       *n_tx_packets += n_custom_tx;
    1417       41252 :       if (PREDICT_FALSE
    1418             :           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
    1419         129 :         return SESSION_TX_OK;
    1420       41123 :       max_burst -= n_custom_tx;
    1421       41123 :       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
    1422             :         {
    1423           0 :           session_evt_add_old (wrk, elt);
    1424           0 :           return SESSION_TX_OK;
    1425             :         }
    1426             :     }
    1427             : 
    1428             :   /* Connection previously descheduled because it had no data to send.
    1429             :    * Clear descheduled flag and reset pacer if in use */
    1430       88058 :   if (transport_connection_is_descheduled (ctx->tc))
    1431       15663 :     transport_connection_clear_descheduled (ctx->tc);
    1432             : 
    1433       88058 :   transport_connection_snd_params (ctx->tc, &ctx->sp);
    1434             : 
    1435       88058 :   if (!ctx->sp.snd_space)
    1436             :     {
    1437             :       /* If the deschedule flag was set, remove session from scheduler.
    1438             :        * Transport is responsible for rescheduling this session. */
    1439        1026 :       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
    1440        1026 :         transport_connection_deschedule (ctx->tc);
    1441             :       /* Request to postpone the session, e.g., zero-wnd and transport
    1442             :        * is not currently probing */
    1443           0 :       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
    1444           0 :         session_evt_add_old (wrk, elt);
    1445             :       /* This flow queue is "empty" so it should be re-evaluated before
    1446             :        * the ones that have data to send. */
    1447             :       else
    1448           0 :         session_evt_add_head_old (wrk, elt);
    1449             : 
    1450        1026 :       return SESSION_TX_NO_DATA;
    1451             :     }
    1452             : 
    1453       87032 :   if (transport_connection_is_tx_paced (ctx->tc))
    1454             :     {
    1455       69806 :       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
    1456       69806 :       if (snd_space < TRANSPORT_PACER_MIN_BURST)
    1457             :         {
    1458         461 :           session_evt_add_head_old (wrk, elt);
    1459         461 :           return SESSION_TX_NO_DATA;
    1460             :         }
    1461       69345 :       snd_space = clib_min (ctx->sp.snd_space, snd_space);
    1462       69345 :       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
    1463       69345 :         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
    1464             :     }
    1465             : 
    1466             :   /* Check how much we can pull. */
    1467       86571 :   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
    1468             : 
    1469       86571 :   if (PREDICT_FALSE (!ctx->max_len_to_snd))
    1470             :     {
    1471       27991 :       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
    1472       27991 :       session_tx_maybe_reschedule (wrk, ctx, elt);
    1473       27991 :       return SESSION_TX_NO_DATA;
    1474             :     }
    1475             : 
    1476       58580 :   vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
    1477             :                         CLIB_CACHE_LINE_BYTES);
    1478       58580 :   n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
    1479       58580 :   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
    1480             :     {
    1481           0 :       if (n_bufs)
    1482           0 :         vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
    1483           0 :       session_evt_add_head_old (wrk, elt);
    1484           0 :       vlib_node_increment_counter (wrk->vm, node->node_index,
    1485             :                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
    1486           0 :       return SESSION_TX_NO_BUFFERS;
    1487             :     }
    1488             : 
    1489       58580 :   if (transport_connection_is_tx_paced (ctx->tc))
    1490       42153 :     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
    1491             : 
    1492       58580 :   ctx->left_to_snd = ctx->max_len_to_snd;
    1493       58580 :   n_left = ctx->n_segs_per_evt;
    1494             : 
    1495       58580 :   vec_validate (ctx->transport_pending_bufs, n_left);
    1496             : 
    1497      528566 :   while (n_left >= 4)
    1498             :     {
    1499             :       vlib_buffer_t *b0, *b1;
    1500             :       u32 bi0, bi1;
    1501             : 
    1502      469986 :       pbi = ctx->tx_buffers[n_bufs - 3];
    1503      469986 :       pb = vlib_get_buffer (vm, pbi);
    1504      469986 :       vlib_prefetch_buffer_header (pb, STORE);
    1505      469986 :       pbi = ctx->tx_buffers[n_bufs - 4];
    1506      469986 :       pb = vlib_get_buffer (vm, pbi);
    1507      469986 :       vlib_prefetch_buffer_header (pb, STORE);
    1508             : 
    1509      469986 :       bi0 = ctx->tx_buffers[--n_bufs];
    1510      469986 :       bi1 = ctx->tx_buffers[--n_bufs];
    1511             : 
    1512      469986 :       b0 = vlib_get_buffer (vm, bi0);
    1513      469986 :       b1 = vlib_get_buffer (vm, bi1);
    1514             : 
    1515      469986 :       session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
    1516      469986 :       session_tx_fill_buffer (wrk, ctx, b1, &n_bufs, peek_data);
    1517             : 
    1518      469986 :       ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
    1519      469986 :       ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left + 1] = b1;
    1520      469986 :       n_left -= 2;
    1521             : 
    1522      469986 :       session_tx_add_pending_buffer (wrk, bi0, next_index);
    1523      469986 :       session_tx_add_pending_buffer (wrk, bi1, next_index);
    1524             :     }
    1525      172812 :   while (n_left)
    1526             :     {
    1527             :       vlib_buffer_t *b0;
    1528             :       u32 bi0;
    1529             : 
    1530      114232 :       if (n_left > 1)
    1531             :         {
    1532       55652 :           pbi = ctx->tx_buffers[n_bufs - 2];
    1533       55652 :           pb = vlib_get_buffer (vm, pbi);
    1534       55652 :           vlib_prefetch_buffer_header (pb, STORE);
    1535             :         }
    1536             : 
    1537      114232 :       bi0 = ctx->tx_buffers[--n_bufs];
    1538      114232 :       b0 = vlib_get_buffer (vm, bi0);
    1539      114232 :       session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
    1540             : 
    1541      114232 :       ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
    1542      114232 :       n_left -= 1;
    1543             : 
    1544      114232 :       session_tx_add_pending_buffer (wrk, bi0, next_index);
    1545             :     }
    1546             : 
    1547             :   /* Ask transport to push headers */
    1548       58580 :   ctx->transport_vft->push_header (ctx->tc, ctx->transport_pending_bufs,
    1549       58580 :                                    ctx->n_segs_per_evt);
    1550             : 
    1551       58580 :   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
    1552           0 :     session_tx_trace_frame (vm, node, next_index, ctx->transport_pending_bufs,
    1553           0 :                             ctx->n_segs_per_evt, ctx->s, n_trace);
    1554             : 
    1555       58580 :   if (PREDICT_FALSE (n_bufs))
    1556           0 :     vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
    1557             : 
    1558       58580 :   *n_tx_packets += ctx->n_segs_per_evt;
    1559             : 
    1560             :   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
    1561             :                ctx->s->tx_fifo->shr->has_event, wrk->last_vlib_time);
    1562             : 
    1563       58580 :   ASSERT (ctx->left_to_snd == 0);
    1564             : 
    1565             :   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
    1566             :    * check if application enqueued more data and reschedule accordingly */
    1567       58580 :   if (ctx->max_len_to_snd < ctx->max_dequeue)
    1568       37224 :     session_evt_add_old (wrk, elt);
    1569             :   else
    1570       21356 :     session_tx_maybe_reschedule (wrk, ctx, elt);
    1571             : 
    1572       58580 :   if (!peek_data)
    1573             :     {
    1574       16427 :       u32 n_dequeued = ctx->max_len_to_snd;
    1575       16427 :       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
    1576       16427 :         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
    1577       16427 :       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
    1578          50 :         session_dequeue_notify (ctx->s);
    1579             :     }
    1580       58580 :   return SESSION_TX_OK;
    1581             : }
    1582             : 
    1583             : int
    1584       70963 : session_tx_fifo_peek_and_snd (session_worker_t * wrk,
    1585             :                               vlib_node_runtime_t * node,
    1586             :                               session_evt_elt_t * e, int *n_tx_packets)
    1587             : {
    1588       70963 :   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
    1589             : }
    1590             : 
    1591             : int
    1592       17226 : session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
    1593             :                                  vlib_node_runtime_t * node,
    1594             :                                  session_evt_elt_t * e, int *n_tx_packets)
    1595             : {
    1596       17226 :   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
    1597             : }
    1598             : 
    1599             : int
    1600     1476360 : session_tx_fifo_dequeue_internal (session_worker_t * wrk,
    1601             :                                   vlib_node_runtime_t * node,
    1602             :                                   session_evt_elt_t * elt, int *n_tx_packets)
    1603             : {
    1604     1476360 :   transport_send_params_t *sp = &wrk->ctx.sp;
    1605     1476360 :   session_t *s = wrk->ctx.s;
    1606             :   u32 n_packets;
    1607             : 
    1608     1476360 :   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
    1609           2 :     return 0;
    1610             : 
    1611             :   /* Clear custom-tx flag used to request reschedule for tx */
    1612     1476360 :   s->flags &= ~SESSION_F_CUSTOM_TX;
    1613             : 
    1614     1476360 :   sp->flags = 0;
    1615     1476360 :   sp->bytes_dequeued = 0;
    1616     1476360 :   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
    1617             :                                  TRANSPORT_PACER_MAX_BURST_PKTS);
    1618             : 
    1619     1476360 :   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
    1620     1476360 :   *n_tx_packets += n_packets;
    1621             : 
    1622     1476360 :   if (s->flags & SESSION_F_CUSTOM_TX)
    1623             :     {
    1624     1284960 :       session_evt_add_old (wrk, elt);
    1625             :     }
    1626      191398 :   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
    1627             :     {
    1628       66511 :       svm_fifo_unset_event (s->tx_fifo);
    1629       66511 :       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
    1630       66483 :         if (svm_fifo_set_event (s->tx_fifo))
    1631       66483 :           session_evt_add_head_old (wrk, elt);
    1632             :     }
    1633             : 
    1634     1477040 :   if (sp->bytes_dequeued &&
    1635         679 :       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->bytes_dequeued))
    1636           8 :     session_dequeue_notify (s);
    1637             : 
    1638     1476360 :   return n_packets;
    1639             : }
    1640             : 
    1641             : always_inline session_t *
    1642     1575310 : session_event_get_session (session_worker_t * wrk, session_event_t * e)
    1643             : {
    1644     1575310 :   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
    1645          18 :     return 0;
    1646             : 
    1647     1575290 :   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
    1648     1575290 :   return pool_elt_at_index (wrk->sessions, e->session_index);
    1649             : }
    1650             : 
    1651             : always_inline void
    1652         892 : session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
    1653             : {
    1654             :   clib_llist_index_t ei;
    1655             :   void (*fp) (void *);
    1656             :   session_event_t *e;
    1657             :   session_t *s;
    1658             : 
    1659         892 :   ei = clib_llist_entry_index (wrk->event_elts, elt);
    1660         892 :   e = &elt->evt;
    1661             : 
    1662         892 :   switch (e->event_type)
    1663             :     {
    1664         148 :     case SESSION_CTRL_EVT_RPC:
    1665         148 :       fp = e->rpc_args.fp;
    1666         148 :       (*fp) (e->rpc_args.arg);
    1667         148 :       break;
    1668           0 :     case SESSION_CTRL_EVT_HALF_CLOSE:
    1669           0 :       s = session_get_from_handle_if_valid (e->session_handle);
    1670           0 :       if (PREDICT_FALSE (!s))
    1671           0 :         break;
    1672           0 :       session_transport_half_close (s);
    1673           0 :       break;
    1674         471 :     case SESSION_CTRL_EVT_CLOSE:
    1675         471 :       s = session_get_from_handle_if_valid (e->session_handle);
    1676         471 :       if (PREDICT_FALSE (!s))
    1677           0 :         break;
    1678         471 :       session_transport_close (s);
    1679         471 :       break;
    1680           0 :     case SESSION_CTRL_EVT_RESET:
    1681           0 :       s = session_get_from_handle_if_valid (e->session_handle);
    1682           0 :       if (PREDICT_FALSE (!s))
    1683           0 :         break;
    1684           0 :       session_transport_reset (s);
    1685           0 :       break;
    1686          43 :     case SESSION_CTRL_EVT_LISTEN:
    1687          43 :       session_mq_listen_handler (wrk, elt);
    1688          43 :       break;
    1689           0 :     case SESSION_CTRL_EVT_LISTEN_URI:
    1690           0 :       session_mq_listen_uri_handler (wrk, elt);
    1691           0 :       break;
    1692          25 :     case SESSION_CTRL_EVT_UNLISTEN:
    1693          25 :       session_mq_unlisten_handler (wrk, elt);
    1694          25 :       break;
    1695          51 :     case SESSION_CTRL_EVT_CONNECT:
    1696          51 :       session_mq_connect_handler (wrk, elt);
    1697          51 :       break;
    1698           0 :     case SESSION_CTRL_EVT_CONNECT_URI:
    1699           0 :       session_mq_connect_uri_handler (wrk, elt);
    1700           0 :       break;
    1701           0 :     case SESSION_CTRL_EVT_SHUTDOWN:
    1702           0 :       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
    1703           0 :       break;
    1704          61 :     case SESSION_CTRL_EVT_DISCONNECT:
    1705          61 :       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
    1706          61 :       break;
    1707           0 :     case SESSION_CTRL_EVT_DISCONNECTED:
    1708           0 :       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
    1709           0 :       break;
    1710          47 :     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
    1711          47 :       session_mq_accepted_reply_handler (wrk, elt);
    1712          47 :       break;
    1713          13 :     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
    1714          13 :       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
    1715             :                                                                     elt));
    1716          13 :       break;
    1717           4 :     case SESSION_CTRL_EVT_RESET_REPLY:
    1718           4 :       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
    1719           4 :       break;
    1720           0 :     case SESSION_CTRL_EVT_WORKER_UPDATE:
    1721           0 :       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
    1722           0 :       break;
    1723          24 :     case SESSION_CTRL_EVT_APP_DETACH:
    1724          24 :       app_mq_detach_handler (wrk, elt);
    1725          24 :       break;
    1726           0 :     case SESSION_CTRL_EVT_APP_WRK_RPC:
    1727           0 :       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
    1728           0 :       break;
    1729           5 :     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
    1730           5 :       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
    1731           5 :       break;
    1732           0 :     default:
    1733           0 :       clib_warning ("unhandled event type %d", e->event_type);
    1734             :     }
    1735             : 
    1736             :   /* Regrab elements in case pool moved */
    1737         892 :   elt = clib_llist_elt (wrk->event_elts, ei);
    1738         892 :   if (!clib_llist_elt_is_linked (elt, evt_list))
    1739             :     {
    1740         837 :       e = &elt->evt;
    1741         837 :       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
    1742         218 :         session_evt_ctrl_data_free (wrk, elt);
    1743         837 :       clib_llist_put (wrk->event_elts, elt);
    1744             :     }
    1745             :   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
    1746         892 : }
    1747             : 
    1748             : always_inline void
    1749     1575310 : session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
    1750             :                            session_evt_elt_t * elt, int *n_tx_packets)
    1751             : {
    1752     1575310 :   session_main_t *smm = &session_main;
    1753             :   app_worker_t *app_wrk;
    1754             :   clib_llist_index_t ei;
    1755             :   session_event_t *e;
    1756             :   session_t *s;
    1757             : 
    1758     1575310 :   ei = clib_llist_entry_index (wrk->event_elts, elt);
    1759     1575310 :   e = &elt->evt;
    1760             : 
    1761     1575310 :   switch (e->event_type)
    1762             :     {
    1763     1564570 :     case SESSION_IO_EVT_TX_FLUSH:
    1764             :     case SESSION_IO_EVT_TX:
    1765     1564570 :       s = session_event_get_session (wrk, e);
    1766     1564570 :       if (PREDICT_FALSE (!s))
    1767          18 :         break;
    1768     1564550 :       CLIB_PREFETCH (s->tx_fifo, sizeof (*(s->tx_fifo)), LOAD);
    1769     1564550 :       wrk->ctx.s = s;
    1770             :       /* Spray packets in per session type frames, since they go to
    1771             :        * different nodes */
    1772     1564550 :       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
    1773     1564550 :       break;
    1774         592 :     case SESSION_IO_EVT_RX:
    1775         592 :       s = session_event_get_session (wrk, e);
    1776         592 :       if (!s)
    1777           0 :         break;
    1778         592 :       transport_app_rx_evt (session_get_transport_proto (s),
    1779         592 :                             s->connection_index, s->thread_index);
    1780         592 :       break;
    1781       10150 :     case SESSION_IO_EVT_BUILTIN_RX:
    1782       10150 :       s = session_event_get_session (wrk, e);
    1783       10150 :       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
    1784             :         break;
    1785       10150 :       svm_fifo_unset_event (s->rx_fifo);
    1786       10150 :       app_wrk = app_worker_get (s->app_wrk_index);
    1787       10150 :       app_worker_builtin_rx (app_wrk, s);
    1788       10150 :       break;
    1789           0 :     case SESSION_IO_EVT_TX_MAIN:
    1790           0 :       s = session_get_if_valid (e->session_index, 0 /* main thread */);
    1791           0 :       if (PREDICT_FALSE (!s))
    1792           0 :         break;
    1793           0 :       wrk->ctx.s = s;
    1794           0 :       if (PREDICT_TRUE (s != 0))
    1795           0 :         (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
    1796           0 :       break;
    1797           0 :     default:
    1798           0 :       clib_warning ("unhandled event type %d", e->event_type);
    1799             :     }
    1800             : 
    1801           0 :   SESSION_EVT (SESSION_EVT_IO_EVT_COUNTS, e->event_type, 1, wrk);
    1802             : 
    1803             :   /* Regrab elements in case pool moved */
    1804     1575310 :   elt = clib_llist_elt (wrk->event_elts, ei);
    1805     1575310 :   if (!clib_llist_elt_is_linked (elt, evt_list))
    1806      180353 :     clib_llist_put (wrk->event_elts, elt);
    1807     1575310 : }
    1808             : 
    1809             : /* *INDENT-OFF* */
    1810             : static const u32 session_evt_msg_sizes[] = {
    1811             : #define _(symc, sym)                                                    \
    1812             :   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
    1813             :   foreach_session_ctrl_evt
    1814             : #undef _
    1815             : };
    1816             : /* *INDENT-ON* */
    1817             : 
    1818             : always_inline void
    1819    75007800 : session_update_time_subscribers (session_main_t *smm, clib_time_type_t now,
    1820             :                                  u32 thread_index)
    1821             : {
    1822             :   session_update_time_fn *fn;
    1823             : 
    1824   158744000 :   vec_foreach (fn, smm->update_time_fns)
    1825    83653500 :     (*fn) (now, thread_index);
    1826    75065900 : }
    1827             : 
    1828             : always_inline void
    1829      150090 : session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
    1830             : {
    1831             :   session_evt_elt_t *elt;
    1832             : 
    1833      150090 :   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
    1834             :     {
    1835         421 :       elt = session_evt_alloc_ctrl (wrk);
    1836         421 :       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
    1837             :         {
    1838         273 :           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
    1839         273 :           elt->evt.event_type = evt->event_type;
    1840         273 :           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
    1841         273 :                             session_evt_msg_sizes[evt->event_type]);
    1842             :         }
    1843             :       else
    1844             :         {
    1845             :           /* Internal control events fit into io events footprint */
    1846         148 :           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
    1847             :         }
    1848             :     }
    1849             :   else
    1850             :     {
    1851      149669 :       elt = session_evt_alloc_new (wrk);
    1852      149669 :       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
    1853             :     }
    1854      150090 : }
    1855             : 
    1856             : static void
    1857       40965 : session_flush_pending_tx_buffers (session_worker_t * wrk,
    1858             :                                   vlib_node_runtime_t * node)
    1859             : {
    1860       40965 :   vlib_buffer_enqueue_to_next_vec (wrk->vm, node, &wrk->pending_tx_buffers,
    1861             :                                    &wrk->pending_tx_nexts,
    1862       40965 :                                    vec_len (wrk->pending_tx_nexts));
    1863       40965 :   vec_reset_length (wrk->pending_tx_buffers);
    1864       40965 :   vec_reset_length (wrk->pending_tx_nexts);
    1865       40965 : }
    1866             : 
    1867             : int
    1868    74953100 : session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
    1869             : {
    1870    74953100 :   svm_msg_q_msg_t _msg, *msg = &_msg;
    1871    74953100 :   u32 i, n_to_dequeue = 0;
    1872             :   session_event_t *evt;
    1873             : 
    1874    74953100 :   n_to_dequeue = svm_msg_q_size (mq);
    1875    75094700 :   for (i = 0; i < n_to_dequeue; i++)
    1876             :     {
    1877      150090 :       svm_msg_q_sub_raw (mq, msg);
    1878      150090 :       evt = svm_msg_q_msg_data (mq, msg);
    1879      150090 :       session_evt_add_to_list (wrk, evt);
    1880      150090 :       svm_msg_q_free_msg (mq, msg);
    1881             :     }
    1882             : 
    1883    74944600 :   return n_to_dequeue;
    1884             : }
    1885             : 
    1886             : static void
    1887      584917 : session_wrk_update_state (session_worker_t *wrk)
    1888             : {
    1889      584917 :   vlib_main_t *vm = wrk->vm;
    1890             : 
    1891      584917 :   if (wrk->state == SESSION_WRK_POLLING)
    1892             :     {
    1893      580178 :       if (clib_llist_elts (wrk->event_elts) == 5 &&
    1894          12 :           vlib_last_vectors_per_main_loop (vm) < 1)
    1895             :         {
    1896          12 :           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
    1897          12 :           vlib_node_set_state (vm, session_queue_node.index,
    1898             :                                VLIB_NODE_STATE_INTERRUPT);
    1899             :         }
    1900             :     }
    1901        4751 :   else if (wrk->state == SESSION_WRK_INTERRUPT)
    1902             :     {
    1903        9484 :       if (clib_llist_elts (wrk->event_elts) > 5 ||
    1904        4737 :           vlib_last_vectors_per_main_loop (vm) > 1)
    1905             :         {
    1906          10 :           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
    1907          10 :           vlib_node_set_state (vm, session_queue_node.index,
    1908             :                                VLIB_NODE_STATE_POLLING);
    1909             :         }
    1910        4737 :       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
    1911             :         {
    1912           4 :           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
    1913             :         }
    1914             :     }
    1915             :   else
    1916             :     {
    1917           4 :       if (clib_llist_elts (wrk->event_elts))
    1918             :         {
    1919           4 :           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
    1920             :         }
    1921             :     }
    1922      584917 : }
    1923             : 
    1924             : static uword
    1925    75004100 : session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
    1926             :                        vlib_frame_t * frame)
    1927             : {
    1928    75004100 :   u32 thread_index = vm->thread_index, __clib_unused n_evts;
    1929             :   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
    1930    75004100 :   session_main_t *smm = vnet_get_session_main ();
    1931    74999100 :   session_worker_t *wrk = &smm->wrk[thread_index];
    1932             :   clib_llist_index_t ei, next_ei, old_ti;
    1933             :   int n_tx_packets;
    1934             : 
    1935             :   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
    1936             : 
    1937    74999100 :   session_wrk_update_time (wrk, vlib_time_now (vm));
    1938             : 
    1939             :   /*
    1940             :    *  Update transport time
    1941             :    */
    1942    75008300 :   session_update_time_subscribers (smm, wrk->last_vlib_time, thread_index);
    1943    75036100 :   n_tx_packets = vec_len (wrk->pending_tx_buffers);
    1944             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
    1945             : 
    1946    75036100 :   if (PREDICT_FALSE (wrk->dma_enabled))
    1947             :     {
    1948           0 :       if (wrk->trans_head == ((wrk->trans_tail + 1) & (wrk->trans_size - 1)))
    1949           0 :         return 0;
    1950           0 :       wrk->batch = vlib_dma_batch_new (vm, wrk->config_index);
    1951           0 :       if (!wrk->batch)
    1952           0 :         return 0;
    1953             :     }
    1954             : 
    1955             :   /*
    1956             :    *  Dequeue new internal mq events
    1957             :    */
    1958             : 
    1959    75036100 :   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
    1960             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
    1961             : 
    1962             :   /*
    1963             :    * Handle control events
    1964             :    */
    1965             : 
    1966    74937900 :   ei = wrk->ctrl_head;
    1967    74937900 :   ctrl_he = clib_llist_elt (wrk->event_elts, ei);
    1968    74879400 :   next_ei = clib_llist_next_index (ctrl_he, evt_list);
    1969    74879400 :   old_ti = clib_llist_prev_index (ctrl_he, evt_list);
    1970    74879400 :   while (ei != old_ti)
    1971             :     {
    1972         892 :       ei = next_ei;
    1973         892 :       elt = clib_llist_elt (wrk->event_elts, next_ei);
    1974         892 :       next_ei = clib_llist_next_index (elt, evt_list);
    1975         892 :       clib_llist_remove (wrk->event_elts, evt_list, elt);
    1976         892 :       session_event_dispatch_ctrl (wrk, elt);
    1977             :     }
    1978             : 
    1979             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
    1980             : 
    1981             :   /*
    1982             :    * Handle the new io events.
    1983             :    */
    1984             : 
    1985    74878600 :   new_he = clib_llist_elt (wrk->event_elts, wrk->new_head);
    1986    74863900 :   old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
    1987    74866400 :   old_ti = clib_llist_prev_index (old_he, evt_list);
    1988             : 
    1989    74866400 :   ei = clib_llist_next_index (new_he, evt_list);
    1990    75046700 :   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
    1991             :     {
    1992      180353 :       elt = clib_llist_elt (wrk->event_elts, ei);
    1993      180353 :       ei = clib_llist_next_index (elt, evt_list);
    1994      180353 :       clib_llist_remove (wrk->event_elts, evt_list, elt);
    1995      180353 :       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
    1996             :     }
    1997             : 
    1998             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
    1999             : 
    2000             :   /*
    2001             :    * Handle the old io events, if we had any prior to processing the new ones
    2002             :    */
    2003             : 
    2004    74866400 :   if (old_ti != wrk->old_head)
    2005             :     {
    2006     1365270 :       old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
    2007     1365270 :       ei = clib_llist_next_index (old_he, evt_list);
    2008             : 
    2009     1395420 :       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
    2010             :         {
    2011     1394960 :           elt = clib_llist_elt (wrk->event_elts, ei);
    2012     1394960 :           next_ei = clib_llist_next_index (elt, evt_list);
    2013     1394960 :           clib_llist_remove (wrk->event_elts, evt_list, elt);
    2014             : 
    2015     1394960 :           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
    2016             : 
    2017     1394960 :           if (ei == old_ti)
    2018     1364800 :             break;
    2019             : 
    2020       30153 :           ei = next_ei;
    2021             :         };
    2022             :     }
    2023             : 
    2024    74866400 :   if (PREDICT_FALSE (wrk->dma_enabled))
    2025             :     {
    2026           0 :       if (wrk->batch_num)
    2027             :         {
    2028           0 :           vlib_dma_batch_set_cookie (vm, wrk->batch, wrk->trans_tail);
    2029           0 :           wrk->batch_num = 0;
    2030           0 :           wrk->trans_tail++;
    2031           0 :           if (wrk->trans_tail == wrk->trans_size)
    2032           0 :             wrk->trans_tail = 0;
    2033             :         }
    2034             : 
    2035           0 :       vlib_dma_batch_submit (vm, wrk->batch);
    2036             :     }
    2037             : 
    2038             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
    2039             : 
    2040    74866400 :   if (vec_len (wrk->pending_tx_buffers))
    2041       40965 :     session_flush_pending_tx_buffers (wrk, node);
    2042             : 
    2043    74866400 :   vlib_node_increment_counter (vm, session_queue_node.index,
    2044             :                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
    2045             : 
    2046             :   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
    2047             : 
    2048    74875100 :   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
    2049      584917 :     session_wrk_update_state (wrk);
    2050             : 
    2051    74875300 :   return n_tx_packets;
    2052             : }
    2053             : 
    2054             : /* *INDENT-OFF* */
    2055      178120 : VLIB_REGISTER_NODE (session_queue_node) = {
    2056             :   .function = session_queue_node_fn,
    2057             :   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
    2058             :   .name = "session-queue",
    2059             :   .format_trace = format_session_queue_trace,
    2060             :   .type = VLIB_NODE_TYPE_INPUT,
    2061             :   .n_errors = SESSION_QUEUE_N_ERROR,
    2062             :   .error_counters = session_error_counters,
    2063             :   .state = VLIB_NODE_STATE_DISABLED,
    2064             : };
    2065             : /* *INDENT-ON* */
    2066             : 
    2067             : static clib_error_t *
    2068        4700 : session_wrk_tfd_read_ready (clib_file_t *cf)
    2069             : {
    2070        4700 :   session_worker_t *wrk = session_main_get_worker (cf->private_data);
    2071             :   u64 buf;
    2072             :   int rv;
    2073             : 
    2074        4700 :   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
    2075        4700 :   rv = read (wrk->timerfd, &buf, sizeof (buf));
    2076        4700 :   if (rv < 0 && errno != EAGAIN)
    2077           0 :     clib_unix_warning ("failed");
    2078        4700 :   return 0;
    2079             : }
    2080             : 
    2081             : static clib_error_t *
    2082           0 : session_wrk_tfd_write_ready (clib_file_t *cf)
    2083             : {
    2084           0 :   return 0;
    2085             : }
    2086             : 
    2087             : void
    2088           2 : session_wrk_enable_adaptive_mode (session_worker_t *wrk)
    2089             : {
    2090           2 :   u32 thread_index = wrk->vm->thread_index;
    2091           2 :   clib_file_t template = { 0 };
    2092             : 
    2093           2 :   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
    2094           0 :     clib_warning ("timerfd_create");
    2095             : 
    2096           2 :   template.read_function = session_wrk_tfd_read_ready;
    2097           2 :   template.write_function = session_wrk_tfd_write_ready;
    2098           2 :   template.file_descriptor = wrk->timerfd;
    2099           2 :   template.private_data = thread_index;
    2100           2 :   template.polling_thread_index = thread_index;
    2101           2 :   template.description = format (0, "session-wrk-tfd-%u", thread_index);
    2102             : 
    2103           2 :   wrk->timerfd_file = clib_file_add (&file_main, &template);
    2104           2 :   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
    2105           2 : }
    2106             : 
    2107             : static clib_error_t *
    2108         559 : session_queue_exit (vlib_main_t * vm)
    2109             : {
    2110         559 :   if (vlib_get_n_threads () < 2)
    2111         523 :     return 0;
    2112             : 
    2113             :   /*
    2114             :    * Shut off (especially) worker-thread session nodes.
    2115             :    * Otherwise, vpp can crash as the main thread unmaps the
    2116             :    * API segment.
    2117             :    */
    2118          36 :   vlib_worker_thread_barrier_sync (vm);
    2119          36 :   session_node_enable_disable (0 /* is_enable */ );
    2120          36 :   vlib_worker_thread_barrier_release (vm);
    2121          36 :   return 0;
    2122             : }
    2123             : 
    2124        2237 : VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
    2125             : 
    2126             : static uword
    2127          10 : session_queue_run_on_main (vlib_main_t * vm)
    2128             : {
    2129             :   vlib_node_runtime_t *node;
    2130             : 
    2131          10 :   node = vlib_node_get_runtime (vm, session_queue_node.index);
    2132          10 :   return session_queue_node_fn (vm, node, 0);
    2133             : }
    2134             : 
    2135             : static uword
    2136          21 : session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
    2137             :                        vlib_frame_t * f)
    2138             : {
    2139          21 :   uword *event_data = 0;
    2140          21 :   f64 timeout = 1.0;
    2141             :   uword event_type;
    2142             : 
    2143             :   while (1)
    2144             :     {
    2145          31 :       vlib_process_wait_for_event_or_clock (vm, timeout);
    2146          10 :       event_type = vlib_process_get_events (vm, (uword **) & event_data);
    2147             : 
    2148          10 :       switch (event_type)
    2149             :         {
    2150           0 :         case SESSION_Q_PROCESS_RUN_ON_MAIN:
    2151             :           /* Run session queue node on main thread */
    2152           0 :           session_queue_run_on_main (vm);
    2153           0 :           break;
    2154           0 :         case SESSION_Q_PROCESS_STOP:
    2155           0 :           vlib_node_set_state (vm, session_queue_process_node.index,
    2156             :                                VLIB_NODE_STATE_DISABLED);
    2157           0 :           timeout = 100000.0;
    2158           0 :           break;
    2159          10 :         case ~0:
    2160             :           /* Timed out. Run on main to ensure all events are handled */
    2161          10 :           session_queue_run_on_main (vm);
    2162          10 :           break;
    2163             :         }
    2164          10 :       vec_reset_length (event_data);
    2165             :     }
    2166             :   return 0;
    2167             : }
    2168             : 
    2169             : /* *INDENT-OFF* */
    2170      178120 : VLIB_REGISTER_NODE (session_queue_process_node) =
    2171             : {
    2172             :   .function = session_queue_process,
    2173             :   .type = VLIB_NODE_TYPE_PROCESS,
    2174             :   .name = "session-queue-process",
    2175             :   .state = VLIB_NODE_STATE_DISABLED,
    2176             : };
    2177             : /* *INDENT-ON* */
    2178             : 
    2179             : static_always_inline uword
    2180           0 : session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
    2181             :                                 vlib_frame_t * frame)
    2182             : {
    2183           0 :   session_main_t *sm = &session_main;
    2184           0 :   if (!sm->wrk[0].vpp_event_queue)
    2185           0 :     return 0;
    2186           0 :   node = vlib_node_get_runtime (vm, session_queue_node.index);
    2187           0 :   return session_queue_node_fn (vm, node, frame);
    2188             : }
    2189             : 
    2190             : /* *INDENT-OFF* */
    2191      178120 : VLIB_REGISTER_NODE (session_queue_pre_input_node) =
    2192             : {
    2193             :   .function = session_queue_pre_input_inline,
    2194             :   .type = VLIB_NODE_TYPE_PRE_INPUT,
    2195             :   .name = "session-queue-main",
    2196             :   .state = VLIB_NODE_STATE_DISABLED,
    2197             : };
    2198             : /* *INDENT-ON* */
    2199             : 
    2200             : /*
    2201             :  * fd.io coding-style-patch-verification: ON
    2202             :  *
    2203             :  * Local Variables:
    2204             :  * eval: (c-set-style "gnu")
    2205             :  * End:
    2206             :  */

Generated by: LCOV version 1.14