LCOV - code coverage report
Current view: top level - vnet/session - session_node.c (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 706 1100 64.2 %
Date: 2023-10-26 01:39:38 Functions: 51 65 78.5 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
       3             :  * Licensed under the Apache License, Version 2.0 (the "License");
       4             :  * you may not use this file except in compliance with the License.
       5             :  * You may obtain a copy of the License at:
       6             :  *
       7             :  *     http://www.apache.org/licenses/LICENSE-2.0
       8             :  *
       9             :  * Unless required by applicable law or agreed to in writing, software
      10             :  * distributed under the License is distributed on an "AS IS" BASIS,
      11             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             :  * See the License for the specific language governing permissions and
      13             :  * limitations under the License.
      14             :  */
      15             : 
      16             : #include <math.h>
      17             : #include <vlib/vlib.h>
      18             : #include <vnet/vnet.h>
      19             : #include <vppinfra/elog.h>
      20             : #include <vnet/session/transport.h>
      21             : #include <vnet/session/session.h>
      22             : #include <vnet/session/application.h>
      23             : #include <vnet/session/application_interface.h>
      24             : #include <vnet/session/application_local.h>
      25             : #include <vnet/session/session_debug.h>
      26             : #include <svm/queue.h>
      27             : #include <sys/timerfd.h>
      28             : 
      29             : static inline void
      30           4 : session_wrk_send_evt_to_main (session_worker_t *wrk, session_evt_elt_t *elt)
      31             : {
      32             :   session_evt_elt_t *he;
      33             :   uword thread_index;
      34             :   u8 is_empty;
      35             : 
      36           4 :   thread_index = wrk->vm->thread_index;
      37           4 :   he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
      38           4 :   is_empty = clib_llist_is_empty (wrk->event_elts, evt_list, he);
      39           4 :   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
      40           4 :   if (is_empty)
      41           4 :     session_send_rpc_evt_to_thread (0, session_wrk_handle_evts_main_rpc,
      42             :                                     uword_to_pointer (thread_index, void *));
      43           4 : }
      44             : 
      45             : #define app_check_thread_and_barrier(_wrk, _elt)                              \
      46             :   if (!vlib_thread_is_main_w_barrier ())                                      \
      47             :     {                                                                         \
      48             :       session_wrk_send_evt_to_main (wrk, elt);                                \
      49             :       return;                                                                 \
      50             :     }
      51             : 
      52             : static void
      53          28 : session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
      54             : {
      55             :   struct itimerspec its;
      56             : 
      57          28 :   its.it_value.tv_sec = 0;
      58          28 :   its.it_value.tv_nsec = time_ns;
      59          28 :   its.it_interval.tv_sec = 0;
      60          28 :   its.it_interval.tv_nsec = its.it_value.tv_nsec;
      61             : 
      62          28 :   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
      63           0 :     clib_warning ("timerfd_settime");
      64          28 : }
      65             : 
      66             : always_inline u64
      67          28 : session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
      68             : {
      69          28 :   if (state == SESSION_WRK_INTERRUPT)
      70          15 :     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
      71          13 :   else if (state == SESSION_WRK_IDLE)
      72           4 :     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
      73             :   else
      74           9 :     return 0;
      75             : }
      76             : 
      77             : static inline void
      78          28 : session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
      79             : {
      80             :   u64 time_ns;
      81             : 
      82          28 :   wrk->state = state;
      83          28 :   if (wrk->timerfd == -1)
      84           0 :     return;
      85          28 :   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
      86          28 :   session_wrk_timerfd_update (wrk, time_ns);
      87             : }
      88             : 
      89             : static transport_endpt_ext_cfg_t *
      90          12 : session_mq_get_ext_config (application_t *app, uword offset)
      91             : {
      92             :   svm_fifo_chunk_t *c;
      93             :   fifo_segment_t *fs;
      94             : 
      95          12 :   fs = application_get_rx_mqs_segment (app);
      96          12 :   c = fs_chunk_ptr (fs->h, offset);
      97          12 :   return (transport_endpt_ext_cfg_t *) c->data;
      98             : }
      99             : 
     100             : static void
     101          12 : session_mq_free_ext_config (application_t *app, uword offset)
     102             : {
     103             :   svm_fifo_chunk_t *c;
     104             :   fifo_segment_t *fs;
     105             : 
     106          12 :   fs = application_get_rx_mqs_segment (app);
     107          12 :   c = fs_chunk_ptr (fs->h, offset);
     108          12 :   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
     109          12 : }
     110             : 
     111             : static void
     112          45 : session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     113             : {
     114          45 :   vnet_listen_args_t _a, *a = &_a;
     115             :   session_listen_msg_t *mp;
     116             :   app_worker_t *app_wrk;
     117             :   application_t *app;
     118             :   int rv;
     119             : 
     120          45 :   app_check_thread_and_barrier (wrk, elt);
     121             : 
     122          43 :   mp = session_evt_ctrl_data (wrk, elt);
     123          43 :   app = application_lookup (mp->client_index);
     124          43 :   if (!app)
     125           0 :     return;
     126             : 
     127          43 :   clib_memset (a, 0, sizeof (*a));
     128          43 :   a->sep.is_ip4 = mp->is_ip4;
     129          43 :   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
     130          43 :   a->sep.port = mp->port;
     131          43 :   a->sep.fib_index = mp->vrf;
     132          43 :   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
     133          43 :   a->sep.transport_proto = mp->proto;
     134          43 :   a->app_index = app->app_index;
     135          43 :   a->wrk_map_index = mp->wrk_index;
     136          43 :   a->sep_ext.transport_flags = mp->flags;
     137             : 
     138          43 :   if (mp->ext_config)
     139           5 :     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
     140             : 
     141          43 :   if ((rv = vnet_listen (a)))
     142           0 :     session_worker_stat_error_inc (wrk, rv, 1);
     143             : 
     144          43 :   app_wrk = application_get_worker (app, mp->wrk_index);
     145          43 :   app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
     146             : 
     147          43 :   if (mp->ext_config)
     148           5 :     session_mq_free_ext_config (app, mp->ext_config);
     149             : 
     150             :   /* Make sure events are flushed before releasing barrier, to avoid
     151             :    * potential race with accept. */
     152          43 :   app_wrk_flush_wrk_events (app_wrk, 0);
     153             : }
     154             : 
     155             : static void
     156           0 : session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     157             : {
     158           0 :   vnet_listen_args_t _a, *a = &_a;
     159             :   session_listen_uri_msg_t *mp;
     160             :   app_worker_t *app_wrk;
     161             :   application_t *app;
     162             :   int rv;
     163             : 
     164           0 :   app_check_thread_and_barrier (wrk, elt);
     165             : 
     166           0 :   mp = session_evt_ctrl_data (wrk, elt);
     167           0 :   app = application_lookup (mp->client_index);
     168           0 :   if (!app)
     169           0 :     return;
     170             : 
     171           0 :   clib_memset (a, 0, sizeof (*a));
     172           0 :   a->uri = (char *) mp->uri;
     173           0 :   a->app_index = app->app_index;
     174           0 :   rv = vnet_bind_uri (a);
     175             : 
     176           0 :   app_wrk = application_get_worker (app, 0);
     177           0 :   app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
     178           0 :   app_wrk_flush_wrk_events (app_wrk, 0);
     179             : }
     180             : 
     181             : static void
     182          51 : session_mq_connect_one (session_connect_msg_t *mp)
     183             : {
     184          51 :   vnet_connect_args_t _a, *a = &_a;
     185             :   app_worker_t *app_wrk;
     186             :   session_worker_t *wrk;
     187             :   application_t *app;
     188             :   int rv;
     189             : 
     190          51 :   app = application_lookup (mp->client_index);
     191          51 :   if (!app)
     192           0 :     return;
     193             : 
     194          51 :   clib_memset (a, 0, sizeof (*a));
     195          51 :   a->sep.is_ip4 = mp->is_ip4;
     196          51 :   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
     197          51 :   a->sep.port = mp->port;
     198          51 :   a->sep.transport_proto = mp->proto;
     199          51 :   a->sep.peer.fib_index = mp->vrf;
     200          51 :   a->sep.dscp = mp->dscp;
     201          51 :   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
     202          51 :   if (mp->is_ip4)
     203             :     {
     204          47 :       ip46_address_mask_ip4 (&a->sep.ip);
     205          47 :       ip46_address_mask_ip4 (&a->sep.peer.ip);
     206             :     }
     207          51 :   a->sep.peer.port = mp->lcl_port;
     208          51 :   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
     209          51 :   a->sep_ext.parent_handle = mp->parent_handle;
     210          51 :   a->sep_ext.transport_flags = mp->flags;
     211          51 :   a->api_context = mp->context;
     212          51 :   a->app_index = app->app_index;
     213          51 :   a->wrk_map_index = mp->wrk_index;
     214             : 
     215          51 :   if (mp->ext_config)
     216           7 :     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
     217             : 
     218          51 :   if ((rv = vnet_connect (a)))
     219             :     {
     220           0 :       wrk = session_main_get_worker (vlib_get_thread_index ());
     221           0 :       session_worker_stat_error_inc (wrk, rv, 1);
     222           0 :       app_wrk = application_get_worker (app, mp->wrk_index);
     223           0 :       app_worker_connect_notify (app_wrk, 0, rv, mp->context);
     224             :     }
     225             : 
     226          51 :   if (mp->ext_config)
     227           7 :     session_mq_free_ext_config (app, mp->ext_config);
     228             : }
     229             : 
     230             : static void
     231          51 : session_mq_handle_connects_rpc (void *arg)
     232             : {
     233          51 :   u32 max_connects = 32, n_connects = 0;
     234             :   session_evt_elt_t *he, *elt, *next;
     235             :   session_worker_t *fwrk;
     236             : 
     237          51 :   ASSERT (session_vlib_thread_is_cl_thread ());
     238             : 
     239             :   /* Pending connects on linked list pertaining to first worker */
     240          51 :   fwrk = session_main_get_worker (transport_cl_thread ());
     241          51 :   if (!fwrk->n_pending_connects)
     242           0 :     return;
     243             : 
     244          51 :   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
     245          51 :   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
     246             : 
     247             :   /* Avoid holding the worker for too long */
     248         102 :   while (n_connects < max_connects && elt != he)
     249             :     {
     250          51 :       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
     251          51 :       clib_llist_remove (fwrk->event_elts, evt_list, elt);
     252          51 :       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
     253          51 :       session_evt_ctrl_data_free (fwrk, elt);
     254          51 :       clib_llist_put (fwrk->event_elts, elt);
     255          51 :       elt = next;
     256          51 :       n_connects += 1;
     257             :     }
     258             : 
     259             :   /* Decrement with worker barrier */
     260          51 :   fwrk->n_pending_connects -= n_connects;
     261          51 :   if (fwrk->n_pending_connects > 0)
     262             :     {
     263           0 :       session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index,
     264             :                                             session_mq_handle_connects_rpc, 0);
     265             :     }
     266             : }
     267             : 
     268             : static void
     269          51 : session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     270             : {
     271          51 :   u32 thread_index = wrk - session_main.wrk;
     272             :   session_evt_elt_t *he;
     273             : 
     274          51 :   if (PREDICT_FALSE (thread_index > transport_cl_thread ()))
     275             :     {
     276           0 :       clib_warning ("Connect on wrong thread. Dropping");
     277           0 :       return;
     278             :     }
     279             : 
     280             :   /* If on worker, check if main has any pending messages. Avoids reordering
     281             :    * with other control messages that need to be handled by main
     282             :    */
     283          51 :   if (thread_index)
     284             :     {
     285           4 :       he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
     286             : 
     287             :       /* Events pending on main, postpone to avoid reordering */
     288           4 :       if (!clib_llist_is_empty (wrk->event_elts, evt_list, he))
     289             :         {
     290           0 :           clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
     291           0 :           return;
     292             :         }
     293             :     }
     294             : 
     295             :   /* Add to pending list to be handled by first worker */
     296          51 :   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
     297          51 :   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
     298             : 
     299             :   /* Decremented with worker barrier */
     300          51 :   wrk->n_pending_connects += 1;
     301          51 :   if (wrk->n_pending_connects == 1)
     302             :     {
     303          51 :       session_send_rpc_evt_to_thread_force (thread_index,
     304             :                                             session_mq_handle_connects_rpc, 0);
     305             :     }
     306             : }
     307             : 
     308             : static void
     309           0 : session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     310             : {
     311           0 :   vnet_connect_args_t _a, *a = &_a;
     312             :   session_connect_uri_msg_t *mp;
     313             :   app_worker_t *app_wrk;
     314             :   application_t *app;
     315             :   int rv;
     316             : 
     317           0 :   app_check_thread_and_barrier (wrk, elt);
     318             : 
     319           0 :   mp = session_evt_ctrl_data (wrk, elt);
     320           0 :   app = application_lookup (mp->client_index);
     321           0 :   if (!app)
     322           0 :     return;
     323             : 
     324           0 :   clib_memset (a, 0, sizeof (*a));
     325           0 :   a->uri = (char *) mp->uri;
     326           0 :   a->api_context = mp->context;
     327           0 :   a->app_index = app->app_index;
     328           0 :   if ((rv = vnet_connect_uri (a)))
     329             :     {
     330           0 :       session_worker_stat_error_inc (wrk, rv, 1);
     331           0 :       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
     332           0 :       app_worker_connect_notify (app_wrk, 0, rv, mp->context);
     333             :     }
     334             : }
     335             : 
     336             : static void
     337           0 : session_mq_shutdown_handler (void *data)
     338             : {
     339           0 :   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
     340           0 :   vnet_shutdown_args_t _a, *a = &_a;
     341             :   application_t *app;
     342             : 
     343           0 :   app = application_lookup (mp->client_index);
     344           0 :   if (!app)
     345           0 :     return;
     346             : 
     347           0 :   a->app_index = app->app_index;
     348           0 :   a->handle = mp->handle;
     349           0 :   vnet_shutdown_session (a);
     350             : }
     351             : 
     352             : static void
     353          60 : session_mq_disconnect_handler (void *data)
     354             : {
     355          60 :   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
     356          60 :   vnet_disconnect_args_t _a, *a = &_a;
     357             :   application_t *app;
     358             : 
     359          60 :   app = application_lookup (mp->client_index);
     360          60 :   if (!app)
     361           1 :     return;
     362             : 
     363          59 :   a->app_index = app->app_index;
     364          59 :   a->handle = mp->handle;
     365          59 :   vnet_disconnect_session (a);
     366             : }
     367             : 
     368             : static void
     369          24 : app_mq_detach_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     370             : {
     371          24 :   vnet_app_detach_args_t _a, *a = &_a;
     372             :   session_app_detach_msg_t *mp;
     373             :   application_t *app;
     374             : 
     375          26 :   app_check_thread_and_barrier (wrk, elt);
     376             : 
     377          24 :   mp = session_evt_ctrl_data (wrk, elt);
     378          24 :   app = application_lookup (mp->client_index);
     379          24 :   if (!app)
     380           2 :     return;
     381             : 
     382          22 :   a->app_index = app->app_index;
     383          22 :   a->api_client_index = mp->client_index;
     384          22 :   vnet_application_detach (a);
     385             : }
     386             : 
     387             : static void
     388          27 : session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     389             : {
     390          27 :   vnet_unlisten_args_t _a, *a = &_a;
     391             :   session_unlisten_msg_t *mp;
     392             :   app_worker_t *app_wrk;
     393             :   session_handle_t sh;
     394             :   application_t *app;
     395             :   int rv;
     396             : 
     397          29 :   app_check_thread_and_barrier (wrk, elt);
     398             : 
     399          25 :   mp = session_evt_ctrl_data (wrk, elt);
     400          25 :   sh = mp->handle;
     401             : 
     402          25 :   app = application_lookup (mp->client_index);
     403          25 :   if (!app)
     404           2 :     return;
     405             : 
     406          23 :   clib_memset (a, 0, sizeof (*a));
     407          23 :   a->app_index = app->app_index;
     408          23 :   a->handle = sh;
     409          23 :   a->wrk_map_index = mp->wrk_index;
     410             : 
     411          23 :   if ((rv = vnet_unlisten (a)))
     412           0 :     session_worker_stat_error_inc (wrk, rv, 1);
     413             : 
     414          23 :   app_wrk = application_get_worker (app, a->wrk_map_index);
     415          23 :   if (!app_wrk)
     416           0 :     return;
     417             : 
     418          23 :   app_worker_unlisten_reply (app_wrk, sh, mp->context, rv);
     419             : }
     420             : 
     421             : static void
     422          47 : session_mq_accepted_reply_handler (session_worker_t *wrk,
     423             :                                    session_evt_elt_t *elt)
     424             : {
     425          47 :   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
     426             :   session_accepted_reply_msg_t *mp;
     427             :   session_state_t old_state;
     428             :   app_worker_t *app_wrk;
     429             :   session_t *s;
     430             : 
     431          47 :   mp = session_evt_ctrl_data (wrk, elt);
     432             : 
     433             :   /* Mail this back from the main thread. We're not polling in main
     434             :    * thread so we're using other workers for notifications. */
     435          47 :   if (session_thread_from_handle (mp->handle) == 0 && vlib_num_workers () &&
     436           0 :       vlib_get_thread_index () != 0)
     437             :     {
     438           0 :       session_wrk_send_evt_to_main (wrk, elt);
     439          12 :       return;
     440             :     }
     441             : 
     442          47 :   s = session_get_from_handle_if_valid (mp->handle);
     443          47 :   if (!s)
     444           0 :     return;
     445             : 
     446          47 :   app_wrk = app_worker_get (s->app_wrk_index);
     447          47 :   if (app_wrk->app_index != mp->context)
     448             :     {
     449           0 :       clib_warning ("app doesn't own session");
     450           0 :       return;
     451             :     }
     452             : 
     453             :   /* Server isn't interested, disconnect the session */
     454          47 :   if (mp->retval)
     455             :     {
     456           0 :       a->app_index = mp->context;
     457           0 :       a->handle = mp->handle;
     458           0 :       vnet_disconnect_session (a);
     459           0 :       s->app_wrk_index = SESSION_INVALID_INDEX;
     460           0 :       return;
     461             :     }
     462             : 
     463             :   /* Special handling for cut-through sessions */
     464          47 :   if (!session_has_transport (s))
     465             :     {
     466          12 :       session_set_state (s, SESSION_STATE_READY);
     467          12 :       ct_session_connect_notify (s, SESSION_E_NONE);
     468          12 :       return;
     469             :     }
     470             : 
     471          35 :   old_state = s->session_state;
     472          35 :   session_set_state (s, SESSION_STATE_READY);
     473             : 
     474          35 :   if (!svm_fifo_is_empty_prod (s->rx_fifo))
     475           3 :     app_worker_rx_notify (app_wrk, s);
     476             : 
     477             :   /* Closed while waiting for app to reply. Resend disconnect */
     478          35 :   if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
     479             :     {
     480           0 :       app_worker_close_notify (app_wrk, s);
     481           0 :       session_set_state (s, old_state);
     482           0 :       return;
     483             :     }
     484             : }
     485             : 
     486             : static void
     487           6 : session_mq_reset_reply_handler (void *data)
     488             : {
     489           6 :   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
     490             :   session_reset_reply_msg_t *mp;
     491             :   app_worker_t *app_wrk;
     492             :   session_t *s;
     493             :   application_t *app;
     494             :   u32 index, thread_index;
     495             : 
     496           6 :   mp = (session_reset_reply_msg_t *) data;
     497           6 :   app = application_lookup (mp->context);
     498           6 :   if (!app)
     499           0 :     return;
     500             : 
     501           6 :   session_parse_handle (mp->handle, &index, &thread_index);
     502           6 :   s = session_get_if_valid (index, thread_index);
     503             : 
     504             :   /* No session or not the right session */
     505           6 :   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
     506           0 :     return;
     507             : 
     508           6 :   app_wrk = app_worker_get (s->app_wrk_index);
     509           6 :   if (!app_wrk || app_wrk->app_index != app->app_index)
     510             :     {
     511           0 :       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
     512             :                     mp->handle);
     513           0 :       return;
     514             :     }
     515             : 
     516             :   /* Client objected to resetting the session, log and continue */
     517           6 :   if (mp->retval)
     518             :     {
     519           0 :       clib_warning ("client retval %d", mp->retval);
     520           0 :       return;
     521             :     }
     522             : 
     523             :   /* This comes as a response to a reset, transport only waiting for
     524             :    * confirmation to remove connection state, no need to disconnect */
     525           6 :   a->handle = mp->handle;
     526           6 :   a->app_index = app->app_index;
     527           6 :   vnet_disconnect_session (a);
     528             : }
     529             : 
     530             : static void
     531           0 : session_mq_disconnected_handler (void *data)
     532             : {
     533             :   session_disconnected_reply_msg_t *rmp;
     534           0 :   vnet_disconnect_args_t _a, *a = &_a;
     535           0 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     536             :   session_disconnected_msg_t *mp;
     537             :   app_worker_t *app_wrk;
     538             :   session_event_t *evt;
     539             :   session_t *s;
     540             :   application_t *app;
     541           0 :   int rv = 0;
     542             : 
     543           0 :   mp = (session_disconnected_msg_t *) data;
     544           0 :   if (!(s = session_get_from_handle_if_valid (mp->handle)))
     545             :     {
     546           0 :       clib_warning ("could not disconnect handle %llu", mp->handle);
     547           0 :       return;
     548             :     }
     549           0 :   app_wrk = app_worker_get (s->app_wrk_index);
     550           0 :   app = application_lookup (mp->client_index);
     551           0 :   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
     552             :     {
     553           0 :       clib_warning ("could not disconnect session: %llu app: %u",
     554             :                     mp->handle, mp->client_index);
     555           0 :       return;
     556             :     }
     557             : 
     558           0 :   a->handle = mp->handle;
     559           0 :   a->app_index = app_wrk->wrk_index;
     560           0 :   rv = vnet_disconnect_session (a);
     561             : 
     562           0 :   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     563             :                                        SESSION_MQ_CTRL_EVT_RING,
     564             :                                        SVM_Q_WAIT, msg);
     565           0 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     566           0 :   clib_memset (evt, 0, sizeof (*evt));
     567           0 :   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
     568           0 :   rmp = (session_disconnected_reply_msg_t *) evt->data;
     569           0 :   rmp->handle = mp->handle;
     570           0 :   rmp->context = mp->context;
     571           0 :   rmp->retval = rv;
     572           0 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     573             : }
     574             : 
     575             : static void
     576          14 : session_mq_disconnected_reply_handler (void *data)
     577             : {
     578             :   session_disconnected_reply_msg_t *mp;
     579          14 :   vnet_disconnect_args_t _a, *a = &_a;
     580             :   application_t *app;
     581             : 
     582          14 :   mp = (session_disconnected_reply_msg_t *) data;
     583             : 
     584             :   /* Client objected to disconnecting the session, log and continue */
     585          14 :   if (mp->retval)
     586             :     {
     587           0 :       clib_warning ("client retval %d", mp->retval);
     588           0 :       return;
     589             :     }
     590             : 
     591             :   /* Disconnect has been confirmed. Confirm close to transport */
     592          14 :   app = application_lookup (mp->context);
     593          14 :   if (app)
     594             :     {
     595          14 :       a->handle = mp->handle;
     596          14 :       a->app_index = app->app_index;
     597          14 :       vnet_disconnect_session (a);
     598             :     }
     599             : }
     600             : 
     601             : static void
     602           0 : session_mq_worker_update_handler (void *data)
     603             : {
     604           0 :   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
     605             :   session_worker_update_reply_msg_t *rmp;
     606           0 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     607             :   app_worker_t *app_wrk;
     608             :   u32 owner_app_wrk_map;
     609             :   session_event_t *evt;
     610             :   session_t *s;
     611             :   application_t *app;
     612             :   int rv;
     613             : 
     614           0 :   app = application_lookup (mp->client_index);
     615           0 :   if (!app)
     616           0 :     return;
     617           0 :   if (!(s = session_get_from_handle_if_valid (mp->handle)))
     618             :     {
     619           0 :       clib_warning ("invalid handle %llu", mp->handle);
     620           0 :       return;
     621             :     }
     622           0 :   app_wrk = app_worker_get (s->app_wrk_index);
     623           0 :   if (app_wrk->app_index != app->app_index)
     624             :     {
     625           0 :       clib_warning ("app %u does not own session %llu", app->app_index,
     626             :                     mp->handle);
     627           0 :       return;
     628             :     }
     629           0 :   owner_app_wrk_map = app_wrk->wrk_map_index;
     630           0 :   app_wrk = application_get_worker (app, mp->wrk_index);
     631             : 
     632             :   /* This needs to come from the new owner */
     633           0 :   if (mp->req_wrk_index == owner_app_wrk_map)
     634             :     {
     635             :       session_req_worker_update_msg_t *wump;
     636             : 
     637           0 :       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     638             :                                            SESSION_MQ_CTRL_EVT_RING,
     639             :                                            SVM_Q_WAIT, msg);
     640           0 :       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     641           0 :       clib_memset (evt, 0, sizeof (*evt));
     642           0 :       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
     643           0 :       wump = (session_req_worker_update_msg_t *) evt->data;
     644           0 :       wump->session_handle = mp->handle;
     645           0 :       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     646           0 :       return;
     647             :     }
     648             : 
     649           0 :   rv = app_worker_own_session (app_wrk, s);
     650           0 :   if (rv)
     651           0 :     session_stat_error_inc (rv, 1);
     652             : 
     653             :   /*
     654             :    * Send reply
     655             :    */
     656           0 :   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     657             :                                        SESSION_MQ_CTRL_EVT_RING,
     658             :                                        SVM_Q_WAIT, msg);
     659           0 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     660           0 :   clib_memset (evt, 0, sizeof (*evt));
     661           0 :   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
     662           0 :   rmp = (session_worker_update_reply_msg_t *) evt->data;
     663           0 :   rmp->handle = mp->handle;
     664           0 :   if (s->rx_fifo)
     665           0 :     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
     666           0 :   if (s->tx_fifo)
     667           0 :     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
     668           0 :   rmp->segment_handle = session_segment_handle (s);
     669           0 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     670             : 
     671             :   /*
     672             :    * Retransmit messages that may have been lost
     673             :    */
     674           0 :   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
     675           0 :     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
     676             : 
     677           0 :   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
     678           0 :     app_worker_rx_notify (app_wrk, s);
     679             : 
     680           0 :   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     681           0 :     app_worker_close_notify (app_wrk, s);
     682             : }
     683             : 
     684             : static void
     685           0 : session_mq_app_wrk_rpc_handler (void *data)
     686             : {
     687           0 :   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
     688           0 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     689             :   session_app_wrk_rpc_msg_t *rmp;
     690             :   app_worker_t *app_wrk;
     691             :   session_event_t *evt;
     692             :   application_t *app;
     693             : 
     694           0 :   app = application_lookup (mp->client_index);
     695           0 :   if (!app)
     696           0 :     return;
     697             : 
     698           0 :   app_wrk = application_get_worker (app, mp->wrk_index);
     699             : 
     700           0 :   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
     701             :                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
     702             :                                        msg);
     703           0 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     704           0 :   clib_memset (evt, 0, sizeof (*evt));
     705           0 :   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
     706           0 :   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
     707           0 :   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
     708           0 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     709             : }
     710             : 
     711             : static void
     712           5 : session_mq_transport_attr_handler (void *data)
     713             : {
     714           5 :   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
     715             :   session_transport_attr_reply_msg_t *rmp;
     716           5 :   svm_msg_q_msg_t _msg, *msg = &_msg;
     717             :   app_worker_t *app_wrk;
     718             :   session_event_t *evt;
     719             :   application_t *app;
     720             :   session_t *s;
     721             :   int rv;
     722             : 
     723           5 :   app = application_lookup (mp->client_index);
     724           5 :   if (!app)
     725           0 :     return;
     726             : 
     727           5 :   if (!(s = session_get_from_handle_if_valid (mp->handle)))
     728             :     {
     729           0 :       clib_warning ("invalid handle %llu", mp->handle);
     730           0 :       return;
     731             :     }
     732           5 :   app_wrk = app_worker_get (s->app_wrk_index);
     733           5 :   if (app_wrk->app_index != app->app_index)
     734             :     {
     735           0 :       clib_warning ("app %u does not own session %llu", app->app_index,
     736             :                     mp->handle);
     737           0 :       return;
     738             :     }
     739             : 
     740           5 :   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
     741             : 
     742           5 :   svm_msg_q_lock_and_alloc_msg_w_ring (
     743             :     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
     744           5 :   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
     745           5 :   clib_memset (evt, 0, sizeof (*evt));
     746           5 :   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
     747           5 :   rmp = (session_transport_attr_reply_msg_t *) evt->data;
     748           5 :   rmp->handle = mp->handle;
     749           5 :   rmp->retval = rv;
     750           5 :   rmp->is_get = mp->is_get;
     751           5 :   if (!rv && mp->is_get)
     752           3 :     rmp->attr = mp->attr;
     753           5 :   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
     754             : }
     755             : 
     756             : void
     757           4 : session_wrk_handle_evts_main_rpc (void *args)
     758             : {
     759           4 :   vlib_main_t *vm = vlib_get_main ();
     760             :   clib_llist_index_t ei, next_ei;
     761             :   session_evt_elt_t *he, *elt;
     762             :   session_worker_t *fwrk;
     763             :   u32 thread_index;
     764             : 
     765           4 :   vlib_worker_thread_barrier_sync (vm);
     766             : 
     767           4 :   thread_index = pointer_to_uword (args);
     768           4 :   fwrk = session_main_get_worker (thread_index);
     769             : 
     770           4 :   he = clib_llist_elt (fwrk->event_elts, fwrk->evts_pending_main);
     771           4 :   ei = clib_llist_next_index (he, evt_list);
     772             : 
     773           8 :   while (ei != fwrk->evts_pending_main)
     774             :     {
     775           4 :       elt = clib_llist_elt (fwrk->event_elts, ei);
     776           4 :       next_ei = clib_llist_next_index (elt, evt_list);
     777           4 :       clib_llist_remove (fwrk->event_elts, evt_list, elt);
     778           4 :       switch (elt->evt.event_type)
     779             :         {
     780           2 :         case SESSION_CTRL_EVT_LISTEN:
     781           2 :           session_mq_listen_handler (fwrk, elt);
     782           2 :           break;
     783           2 :         case SESSION_CTRL_EVT_UNLISTEN:
     784           2 :           session_mq_unlisten_handler (fwrk, elt);
     785           2 :           break;
     786           0 :         case SESSION_CTRL_EVT_APP_DETACH:
     787           0 :           app_mq_detach_handler (fwrk, elt);
     788           0 :           break;
     789           0 :         case SESSION_CTRL_EVT_CONNECT_URI:
     790           0 :           session_mq_connect_uri_handler (fwrk, elt);
     791           0 :           break;
     792           0 :         case SESSION_CTRL_EVT_ACCEPTED_REPLY:
     793           0 :           session_mq_accepted_reply_handler (fwrk, elt);
     794           0 :           break;
     795           0 :         case SESSION_CTRL_EVT_CONNECT:
     796           0 :           session_mq_connect_handler (fwrk, elt);
     797           0 :           break;
     798           0 :         default:
     799           0 :           clib_warning ("unhandled %u", elt->evt.event_type);
     800           0 :           ALWAYS_ASSERT (0);
     801           0 :           break;
     802             :         }
     803             : 
     804             :       /* Regrab element in case pool moved */
     805           4 :       elt = clib_llist_elt (fwrk->event_elts, ei);
     806           4 :       if (!clib_llist_elt_is_linked (elt, evt_list))
     807             :         {
     808           4 :           session_evt_ctrl_data_free (fwrk, elt);
     809           4 :           clib_llist_put (fwrk->event_elts, elt);
     810             :         }
     811           4 :       ei = next_ei;
     812             :     }
     813             : 
     814           4 :   vlib_worker_thread_barrier_release (vm);
     815           4 : }
     816             : 
     817             : vlib_node_registration_t session_queue_node;
     818             : 
     819             : typedef struct
     820             : {
     821             :   u32 session_index;
     822             :   u32 server_thread_index;
     823             : } session_queue_trace_t;
     824             : 
     825             : /* packet trace format function */
     826             : static u8 *
     827           0 : format_session_queue_trace (u8 * s, va_list * args)
     828             : {
     829           0 :   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
     830           0 :   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
     831           0 :   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
     832             : 
     833           0 :   s = format (s, "session index %d thread index %d",
     834             :               t->session_index, t->server_thread_index);
     835           0 :   return s;
     836             : }
     837             : 
     838             : #define foreach_session_queue_error                                           \
     839             :   _ (TX, tx, INFO, "Packets transmitted")                                     \
     840             :   _ (TIMER, timer, INFO, "Timer events")                                      \
     841             :   _ (NO_BUFFER, no_buffer, ERROR, "Out of buffers")
     842             : 
     843             : typedef enum
     844             : {
     845             : #define _(f, n, s, d) SESSION_QUEUE_ERROR_##f,
     846             :   foreach_session_queue_error
     847             : #undef _
     848             :     SESSION_QUEUE_N_ERROR,
     849             : } session_queue_error_t;
     850             : 
     851             : static vlib_error_desc_t session_error_counters[] = {
     852             : #define _(f, n, s, d) { #n, d, VL_COUNTER_SEVERITY_##s },
     853             :   foreach_session_queue_error
     854             : #undef _
     855             : };
     856             : 
     857             : enum
     858             : {
     859             :   SESSION_TX_NO_BUFFERS = -2,
     860             :   SESSION_TX_NO_DATA,
     861             :   SESSION_TX_OK
     862             : };
     863             : 
     864             : static void
     865           0 : session_tx_trace_frame (vlib_main_t *vm, vlib_node_runtime_t *node,
     866             :                         u32 next_index, vlib_buffer_t **bufs, u16 n_segs,
     867             :                         session_t *s, u32 n_trace)
     868             : {
     869           0 :   vlib_buffer_t **b = bufs;
     870             : 
     871           0 :   while (n_trace && n_segs)
     872             :     {
     873           0 :       if (PREDICT_TRUE (vlib_trace_buffer (vm, node, next_index, b[0],
     874             :                                            1 /* follow_chain */)))
     875             :         {
     876             :           session_queue_trace_t *t =
     877           0 :             vlib_add_trace (vm, node, b[0], sizeof (*t));
     878           0 :           t->session_index = s->session_index;
     879           0 :           t->server_thread_index = s->thread_index;
     880           0 :           n_trace--;
     881             :         }
     882           0 :       b++;
     883           0 :       n_segs--;
     884             :     }
     885           0 :   vlib_set_trace_count (vm, node, n_trace);
     886           0 : }
     887             : 
     888             : always_inline int
     889           0 : session_tx_fill_dma_transfers (session_worker_t *wrk,
     890             :                                session_tx_context_t *ctx, vlib_buffer_t *b)
     891             : {
     892           0 :   vlib_main_t *vm = wrk->vm;
     893             :   u32 len_to_deq;
     894           0 :   u8 *data0 = NULL;
     895             :   int n_bytes_read, len_write;
     896             :   svm_fifo_seg_t data_fs[2];
     897             : 
     898           0 :   u32 n_segs = 2;
     899           0 :   u16 n_transfers = 0;
     900             :   /*
     901             :    * Start with the first buffer in chain
     902             :    */
     903           0 :   b->error = 0;
     904           0 :   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
     905           0 :   b->current_data = 0;
     906           0 :   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
     907           0 :   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
     908             : 
     909           0 :   n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
     910             :                                     data_fs, &n_segs, len_to_deq);
     911             : 
     912           0 :   len_write = n_bytes_read;
     913           0 :   ASSERT (n_bytes_read == len_to_deq);
     914             : 
     915           0 :   while (n_bytes_read)
     916             :     {
     917           0 :       wrk->batch_num++;
     918           0 :       vlib_dma_batch_add (vm, wrk->batch, data0, data_fs[n_transfers].data,
     919             :                           data_fs[n_transfers].len);
     920           0 :       data0 += data_fs[n_transfers].len;
     921           0 :       n_bytes_read -= data_fs[n_transfers].len;
     922           0 :       n_transfers++;
     923             :     }
     924           0 :   return len_write;
     925             : }
     926             : 
     927             : always_inline int
     928           0 : session_tx_fill_dma_transfers_tail (session_worker_t *wrk,
     929             :                                     session_tx_context_t *ctx,
     930             :                                     vlib_buffer_t *b, u32 len_to_deq, u8 *data)
     931             : {
     932           0 :   vlib_main_t *vm = wrk->vm;
     933             :   int n_bytes_read, len_write;
     934             :   svm_fifo_seg_t data_fs[2];
     935           0 :   u32 n_segs = 2;
     936           0 :   u16 n_transfers = 0;
     937             : 
     938           0 :   n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
     939             :                                     data_fs, &n_segs, len_to_deq);
     940             : 
     941           0 :   len_write = n_bytes_read;
     942             : 
     943           0 :   ASSERT (n_bytes_read == len_to_deq);
     944             : 
     945           0 :   while (n_bytes_read)
     946             :     {
     947           0 :       wrk->batch_num++;
     948           0 :       vlib_dma_batch_add (vm, wrk->batch, data, data_fs[n_transfers].data,
     949             :                           data_fs[n_transfers].len);
     950           0 :       data += data_fs[n_transfers].len;
     951           0 :       n_bytes_read -= data_fs[n_transfers].len;
     952           0 :       n_transfers++;
     953             :     }
     954             : 
     955           0 :   return len_write;
     956             : }
     957             : 
     958             : always_inline int
     959     1029090 : session_tx_copy_data (session_worker_t *wrk, session_tx_context_t *ctx,
     960             :                       vlib_buffer_t *b, u32 len_to_deq, u8 *data0)
     961             : {
     962             :   int n_bytes_read;
     963     1029090 :   if (PREDICT_TRUE (!wrk->dma_enabled))
     964             :     n_bytes_read =
     965     1029090 :       svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0);
     966             :   else
     967           0 :     n_bytes_read = session_tx_fill_dma_transfers (wrk, ctx, b);
     968     1029090 :   return n_bytes_read;
     969             : }
     970             : 
     971             : always_inline int
     972           0 : session_tx_copy_data_tail (session_worker_t *wrk, session_tx_context_t *ctx,
     973             :                            vlib_buffer_t *b, u32 len_to_deq, u8 *data)
     974             : {
     975             :   int n_bytes_read;
     976           0 :   if (PREDICT_TRUE (!wrk->dma_enabled))
     977             :     n_bytes_read =
     978           0 :       svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data);
     979             :   else
     980             :     n_bytes_read =
     981           0 :       session_tx_fill_dma_transfers_tail (wrk, ctx, b, len_to_deq, data);
     982           0 :   return n_bytes_read;
     983             : }
     984             : 
     985             : always_inline void
     986           0 : session_tx_fifo_chain_tail (session_worker_t *wrk, session_tx_context_t *ctx,
     987             :                             vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
     988             : {
     989           0 :   vlib_main_t *vm = wrk->vm;
     990             :   vlib_buffer_t *chain_b, *prev_b;
     991             :   u32 chain_bi0, to_deq, left_from_seg;
     992             :   int len_to_deq, n_bytes_read;
     993             :   u8 *data, j;
     994             : 
     995           0 :   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
     996           0 :   b->total_length_not_including_first_buffer = 0;
     997             : 
     998           0 :   chain_b = b;
     999           0 :   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
    1000             :                             ctx->left_to_snd);
    1001           0 :   to_deq = left_from_seg;
    1002           0 :   for (j = 1; j < ctx->n_bufs_per_seg; j++)
    1003             :     {
    1004           0 :       prev_b = chain_b;
    1005           0 :       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
    1006             : 
    1007           0 :       *n_bufs -= 1;
    1008           0 :       chain_bi0 = ctx->tx_buffers[*n_bufs];
    1009           0 :       chain_b = vlib_get_buffer (vm, chain_bi0);
    1010           0 :       chain_b->current_data = 0;
    1011           0 :       data = vlib_buffer_get_current (chain_b);
    1012           0 :       if (peek_data)
    1013             :         {
    1014             :           n_bytes_read =
    1015           0 :             session_tx_copy_data_tail (wrk, ctx, b, len_to_deq, data);
    1016           0 :           ctx->sp.tx_offset += n_bytes_read;
    1017             :         }
    1018             :       else
    1019             :         {
    1020           0 :           if (ctx->transport_vft->transport_options.tx_type ==
    1021             :               TRANSPORT_TX_DGRAM)
    1022             :             {
    1023           0 :               svm_fifo_t *f = ctx->s->tx_fifo;
    1024           0 :               session_dgram_hdr_t *hdr = &ctx->hdr;
    1025             :               u16 deq_now;
    1026             :               u32 offset;
    1027             : 
    1028           0 :               deq_now = clib_min (hdr->data_length - hdr->data_offset,
    1029             :                                   len_to_deq);
    1030           0 :               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
    1031           0 :               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
    1032           0 :               ASSERT (n_bytes_read > 0);
    1033             : 
    1034           0 :               hdr->data_offset += n_bytes_read;
    1035           0 :               if (hdr->data_offset == hdr->data_length)
    1036             :                 {
    1037           0 :                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
    1038           0 :                   svm_fifo_dequeue_drop (f, offset);
    1039           0 :                   if (ctx->left_to_snd > n_bytes_read)
    1040           0 :                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
    1041           0 :                                    (u8 *) & ctx->hdr);
    1042             :                 }
    1043           0 :               else if (ctx->left_to_snd == n_bytes_read)
    1044           0 :                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
    1045             :                                          sizeof (session_dgram_pre_hdr_t));
    1046             :             }
    1047             :           else
    1048           0 :             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
    1049             :                                              len_to_deq, data);
    1050             :         }
    1051           0 :       ASSERT (n_bytes_read == len_to_deq);
    1052           0 :       chain_b->current_length = n_bytes_read;
    1053           0 :       b->total_length_not_including_first_buffer += chain_b->current_length;
    1054             : 
    1055             :       /* update previous buffer */
    1056           0 :       prev_b->next_buffer = chain_bi0;
    1057           0 :       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
    1058             : 
    1059             :       /* update current buffer */
    1060           0 :       chain_b->next_buffer = 0;
    1061             : 
    1062           0 :       to_deq -= n_bytes_read;
    1063           0 :       if (to_deq == 0)
    1064           0 :         break;
    1065             :     }
    1066           0 :   ASSERT (to_deq == 0
    1067             :           && b->total_length_not_including_first_buffer == left_from_seg);
    1068           0 :   ctx->left_to_snd -= left_from_seg;
    1069           0 : }
    1070             : 
    1071             : always_inline void
    1072     1088160 : session_tx_fill_buffer (session_worker_t *wrk, session_tx_context_t *ctx,
    1073             :                         vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
    1074             : {
    1075             :   u32 len_to_deq;
    1076             :   u8 *data0;
    1077             :   int n_bytes_read;
    1078             :   /*
    1079             :    * Start with the first buffer in chain
    1080             :    */
    1081     1088160 :   b->error = 0;
    1082     1088160 :   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
    1083     1088160 :   b->current_data = 0;
    1084             : 
    1085     1088160 :   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
    1086     1088160 :   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
    1087             : 
    1088     1088160 :   if (peek_data)
    1089             :     {
    1090     1029090 :       n_bytes_read = session_tx_copy_data (wrk, ctx, b, len_to_deq, data0);
    1091     1029090 :       ASSERT (n_bytes_read > 0);
    1092             :       /* Keep track of progress locally, transport is also supposed to
    1093             :        * increment it independently when pushing the header */
    1094     1029090 :       ctx->sp.tx_offset += n_bytes_read;
    1095             :     }
    1096             :   else
    1097             :     {
    1098       59065 :       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
    1099             :         {
    1100       59065 :           session_dgram_hdr_t *hdr = &ctx->hdr;
    1101       59065 :           svm_fifo_t *f = ctx->s->tx_fifo;
    1102             :           u16 deq_now;
    1103             :           u32 offset;
    1104             : 
    1105       59065 :           ASSERT (hdr->data_length > hdr->data_offset);
    1106       59065 :           deq_now = clib_min (hdr->data_length - hdr->data_offset,
    1107             :                               len_to_deq);
    1108       59065 :           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
    1109       59065 :           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
    1110       59065 :           ASSERT (n_bytes_read > 0);
    1111             : 
    1112       59065 :           if (transport_connection_is_cless (ctx->tc))
    1113             :             {
    1114           0 :               clib_memcpy_fast (data0 - sizeof (session_dgram_hdr_t), hdr,
    1115             :                                 sizeof (*hdr));
    1116             :             }
    1117       59065 :           hdr->data_offset += n_bytes_read;
    1118       59065 :           if (hdr->data_offset == hdr->data_length)
    1119             :             {
    1120       59065 :               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
    1121       59065 :               svm_fifo_dequeue_drop (f, offset);
    1122       59065 :               if (ctx->left_to_snd > n_bytes_read)
    1123       42005 :                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
    1124       42005 :                                (u8 *) & ctx->hdr);
    1125             :             }
    1126           0 :           else if (ctx->left_to_snd == n_bytes_read)
    1127           0 :             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
    1128             :                                      sizeof (session_dgram_pre_hdr_t));
    1129             :         }
    1130             :       else
    1131             :         {
    1132           0 :           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
    1133             :                                            len_to_deq, data0);
    1134           0 :           ASSERT (n_bytes_read > 0);
    1135             :         }
    1136             :     }
    1137             : 
    1138     1088160 :   b->current_length = n_bytes_read;
    1139     1088160 :   ctx->left_to_snd -= n_bytes_read;
    1140             : 
    1141             :   /*
    1142             :    * Fill in the remaining buffers in the chain, if any
    1143             :    */
    1144     1088160 :   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
    1145           0 :     session_tx_fifo_chain_tail (wrk, ctx, b, n_bufs, peek_data);
    1146     1088160 : }
    1147             : 
    1148             : always_inline u8
    1149      101925 : session_tx_not_ready (session_t * s, u8 peek_data)
    1150             : {
    1151      101925 :   if (peek_data)
    1152             :     {
    1153       83734 :       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
    1154       83459 :         return 0;
    1155             :       /* Can retransmit for closed sessions but can't send new data if
    1156             :        * session is not ready or closed */
    1157         275 :       else if (s->session_state < SESSION_STATE_READY)
    1158             :         {
    1159             :           /* Allow accepting session to send custom packets.
    1160             :            * For instance, tcp want to send acks in established, but
    1161             :            * the app has not called accept() yet */
    1162           0 :           if (s->session_state == SESSION_STATE_ACCEPTING &&
    1163           0 :               (s->flags & SESSION_F_CUSTOM_TX))
    1164           0 :             return 0;
    1165           0 :           return 1;
    1166             :         }
    1167         275 :       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
    1168             :         {
    1169             :           /* Allow closed transports to still send custom packets.
    1170             :            * For instance, tcp may want to send acks in time-wait. */
    1171         131 :           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
    1172         131 :               && (s->flags & SESSION_F_CUSTOM_TX))
    1173         129 :             return 0;
    1174           2 :           return 2;
    1175             :         }
    1176             :     }
    1177             :   else
    1178             :     {
    1179       18191 :       if (s->session_state == SESSION_STATE_TRANSPORT_DELETED)
    1180           0 :         return 2;
    1181             :     }
    1182       18335 :   return 0;
    1183             : }
    1184             : 
    1185             : always_inline transport_connection_t *
    1186      101923 : session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
    1187             : {
    1188      101923 :   if (peek_data)
    1189             :     {
    1190       83732 :       return ctx->transport_vft->get_connection (ctx->s->connection_index,
    1191       83732 :                                                  ctx->s->thread_index);
    1192             :     }
    1193             :   else
    1194             :     {
    1195       18191 :       if (ctx->s->session_state == SESSION_STATE_LISTENING)
    1196           0 :         return ctx->transport_vft->get_listener (ctx->s->connection_index);
    1197             :       else
    1198             :         {
    1199       18191 :           return ctx->transport_vft->get_connection (ctx->s->connection_index,
    1200       18191 :                                                      ctx->s->thread_index);
    1201             :         }
    1202             :     }
    1203             : }
    1204             : 
    1205             : always_inline void
    1206       87251 : session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
    1207             :                                u32 max_segs, u8 peek_data)
    1208             : {
    1209             :   u32 n_bytes_per_buf, n_bytes_per_seg;
    1210             : 
    1211       87251 :   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
    1212       87251 :   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
    1213             : 
    1214       87251 :   if (peek_data)
    1215             :     {
    1216             :       /* Offset in rx fifo from where to peek data */
    1217       69060 :       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
    1218             :         {
    1219       27040 :           ctx->max_len_to_snd = 0;
    1220       27040 :           return;
    1221             :         }
    1222       42020 :       ctx->max_dequeue -= ctx->sp.tx_offset;
    1223             :     }
    1224             :   else
    1225             :     {
    1226       18191 :       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
    1227             :         {
    1228             :           u32 len, chain_limit;
    1229             : 
    1230       18191 :           if (ctx->max_dequeue <= sizeof (ctx->hdr))
    1231             :             {
    1232        1131 :               ctx->max_len_to_snd = 0;
    1233        1131 :               return;
    1234             :             }
    1235             : 
    1236       17060 :           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
    1237       17060 :                          (u8 *) & ctx->hdr);
    1238             :           /* Zero length dgrams not supported */
    1239       17060 :           if (PREDICT_FALSE (ctx->hdr.data_length == 0))
    1240             :             {
    1241           0 :               svm_fifo_dequeue_drop (ctx->s->tx_fifo, sizeof (ctx->hdr));
    1242           0 :               ctx->max_len_to_snd = 0;
    1243           0 :               return;
    1244             :             }
    1245             :           /* We cannot be sure apps have not enqueued incomplete dgrams */
    1246       17060 :           if (PREDICT_FALSE (ctx->max_dequeue <
    1247             :                              ctx->hdr.data_length + sizeof (ctx->hdr)))
    1248             :             {
    1249           0 :               ctx->max_len_to_snd = 0;
    1250           0 :               return;
    1251             :             }
    1252       17060 :           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
    1253       17060 :           len = ctx->hdr.data_length - ctx->hdr.data_offset;
    1254             : 
    1255       17060 :           if (ctx->hdr.gso_size)
    1256             :             {
    1257           0 :               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, ctx->hdr.gso_size);
    1258             :             }
    1259             : 
    1260             :           /* Process multiple dgrams if smaller than min (buf_space, mss).
    1261             :            * This avoids handling multiple dgrams if they require buffer
    1262             :            * chains */
    1263       17060 :           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
    1264             :                                   ctx->sp.snd_mss);
    1265       17060 :           if (ctx->hdr.data_length <= chain_limit)
    1266             :             {
    1267             :               u32 first_dgram_len, dgram_len, offset, max_offset;
    1268             :               session_dgram_hdr_t hdr;
    1269             : 
    1270       17060 :               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
    1271       17060 :               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
    1272       17060 :               first_dgram_len = len;
    1273       17060 :               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
    1274             : 
    1275       59355 :               while (offset < max_offset)
    1276             :                 {
    1277       45420 :                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
    1278             :                                  (u8 *) & hdr);
    1279       45420 :                   dgram_len = hdr.data_length - hdr.data_offset;
    1280       45420 :                   if (offset + sizeof (hdr) + hdr.data_length >
    1281       45420 :                         ctx->max_dequeue ||
    1282             :                       first_dgram_len != dgram_len)
    1283             :                     break;
    1284             :                   /* Assert here to allow test above with zero length dgrams */
    1285       42295 :                   ASSERT (hdr.data_length > hdr.data_offset);
    1286       42295 :                   len += dgram_len;
    1287       42295 :                   offset += sizeof (hdr) + hdr.data_length;
    1288             :                 }
    1289             :             }
    1290             : 
    1291       17060 :           ctx->max_dequeue = len;
    1292             :         }
    1293             :     }
    1294       59080 :   ASSERT (ctx->max_dequeue > 0);
    1295             : 
    1296             :   /* Ensure we're not writing more than transport window allows */
    1297       59080 :   if (ctx->max_dequeue < ctx->sp.snd_space)
    1298             :     {
    1299             :       /* Constrained by tx queue. Try to send only fully formed segments */
    1300       18342 :       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
    1301       18342 :         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
    1302             :         ctx->max_dequeue;
    1303             :       /* TODO Nagle ? */
    1304             :     }
    1305             :   else
    1306             :     {
    1307             :       /* Expectation is that snd_space0 is already a multiple of snd_mss */
    1308       40738 :       ctx->max_len_to_snd = ctx->sp.snd_space;
    1309             :     }
    1310             : 
    1311             :   /* Check if we're tx constrained by the node */
    1312       59080 :   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
    1313       59080 :   if (ctx->n_segs_per_evt > max_segs)
    1314             :     {
    1315         589 :       ctx->n_segs_per_evt = max_segs;
    1316         589 :       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
    1317             :     }
    1318             : 
    1319       59080 :   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
    1320       59080 :   if (ctx->n_segs_per_evt > 1)
    1321             :     {
    1322             :       u32 n_bytes_last_seg, n_bufs_last_seg;
    1323             : 
    1324       30330 :       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
    1325       30330 :       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
    1326       30330 :         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
    1327       30330 :       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
    1328       30330 :       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
    1329       30330 :       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
    1330       30330 :         + n_bufs_last_seg;
    1331             :     }
    1332             :   else
    1333             :     {
    1334       28750 :       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
    1335       28750 :       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
    1336       28750 :       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
    1337             :     }
    1338             : 
    1339       59080 :   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
    1340       59080 :   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
    1341             :                                      n_bytes_per_buf -
    1342             :                                      TRANSPORT_MAX_HDRS_LEN);
    1343             : }
    1344             : 
    1345             : always_inline void
    1346       54227 : session_tx_maybe_reschedule (session_worker_t * wrk,
    1347             :                              session_tx_context_t * ctx,
    1348             :                              session_evt_elt_t * elt)
    1349             : {
    1350       54227 :   session_t *s = ctx->s;
    1351             : 
    1352       54227 :   svm_fifo_unset_event (s->tx_fifo);
    1353       54227 :   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
    1354             :     {
    1355        5324 :       if (svm_fifo_set_event (s->tx_fifo))
    1356        5324 :         session_evt_add_head_old (wrk, elt);
    1357             :     }
    1358             :   else
    1359             :     {
    1360       48903 :       transport_connection_deschedule (ctx->tc);
    1361             :     }
    1362       54227 : }
    1363             : 
    1364             : always_inline void
    1365     1088160 : session_tx_add_pending_buffer (session_worker_t *wrk, u32 bi, u32 next_index)
    1366             : {
    1367     1088160 :   if (PREDICT_TRUE (!wrk->dma_enabled))
    1368             :     {
    1369     1088160 :       vec_add1 (wrk->pending_tx_buffers, bi);
    1370     1088160 :       vec_add1 (wrk->pending_tx_nexts, next_index);
    1371             :     }
    1372             :   else
    1373             :     {
    1374           0 :       session_dma_transfer *dma_transfer = &wrk->dma_trans[wrk->trans_tail];
    1375           0 :       vec_add1 (dma_transfer->pending_tx_buffers, bi);
    1376           0 :       vec_add1 (dma_transfer->pending_tx_nexts, next_index);
    1377             :     }
    1378     1088160 : }
    1379             : 
    1380             : always_inline int
    1381      101925 : session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
    1382             :                                 vlib_node_runtime_t * node,
    1383             :                                 session_evt_elt_t * elt,
    1384             :                                 int *n_tx_packets, u8 peek_data)
    1385             : {
    1386             :   u32 n_trace, n_left, pbi, next_index, max_burst;
    1387      101925 :   session_tx_context_t *ctx = &wrk->ctx;
    1388      101925 :   session_main_t *smm = &session_main;
    1389      101925 :   session_event_t *e = &elt->evt;
    1390      101925 :   vlib_main_t *vm = wrk->vm;
    1391             :   transport_proto_t tp;
    1392             :   vlib_buffer_t *pb;
    1393             :   u16 n_bufs, rv;
    1394             : 
    1395      101925 :   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
    1396             :     {
    1397           2 :       if (rv < 2)
    1398           0 :         session_evt_add_old (wrk, elt);
    1399           2 :       return SESSION_TX_NO_DATA;
    1400             :     }
    1401             : 
    1402      101923 :   next_index = smm->session_type_to_next[ctx->s->session_type];
    1403      101923 :   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
    1404             : 
    1405      101923 :   tp = session_get_transport_proto (ctx->s);
    1406      101923 :   ctx->transport_vft = transport_protocol_get_vft (tp);
    1407      101923 :   ctx->tc = session_tx_get_transport (ctx, peek_data);
    1408             : 
    1409      101923 :   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
    1410             :     {
    1411         697 :       if (ctx->transport_vft->flush_data)
    1412         317 :         ctx->transport_vft->flush_data (ctx->tc);
    1413         697 :       e->event_type = SESSION_IO_EVT_TX;
    1414             :     }
    1415             : 
    1416      101923 :   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
    1417             :     {
    1418             :       u32 n_custom_tx;
    1419       41829 :       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
    1420       41829 :       ctx->sp.max_burst_size = max_burst;
    1421       41829 :       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
    1422       41829 :       *n_tx_packets += n_custom_tx;
    1423       41829 :       if (PREDICT_FALSE
    1424             :           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
    1425         129 :         return SESSION_TX_OK;
    1426       41700 :       max_burst -= n_custom_tx;
    1427       41700 :       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
    1428             :         {
    1429           0 :           session_evt_add_old (wrk, elt);
    1430           0 :           return SESSION_TX_OK;
    1431             :         }
    1432             :     }
    1433             : 
    1434             :   /* Connection previously descheduled because it had no data to send.
    1435             :    * Clear descheduled flag and reset pacer if in use */
    1436      101794 :   if (transport_connection_is_descheduled (ctx->tc))
    1437       22214 :     transport_connection_clear_descheduled (ctx->tc);
    1438             : 
    1439      101794 :   transport_connection_snd_params (ctx->tc, &ctx->sp);
    1440             : 
    1441      101794 :   if (!ctx->sp.snd_space)
    1442             :     {
    1443             :       /* If the deschedule flag was set, remove session from scheduler.
    1444             :        * Transport is responsible for rescheduling this session. */
    1445       14457 :       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
    1446       14457 :         transport_connection_deschedule (ctx->tc);
    1447             :       /* Request to postpone the session, e.g., zero-wnd and transport
    1448             :        * is not currently probing */
    1449           0 :       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
    1450           0 :         session_evt_add_old (wrk, elt);
    1451             :       /* This flow queue is "empty" so it should be re-evaluated before
    1452             :        * the ones that have data to send. */
    1453             :       else
    1454           0 :         session_evt_add_head_old (wrk, elt);
    1455             : 
    1456       14457 :       return SESSION_TX_NO_DATA;
    1457             :     }
    1458             : 
    1459       87337 :   if (transport_connection_is_tx_paced (ctx->tc))
    1460             :     {
    1461       69146 :       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
    1462       69146 :       if (snd_space < TRANSPORT_PACER_MIN_BURST)
    1463             :         {
    1464          86 :           session_evt_add_head_old (wrk, elt);
    1465          86 :           return SESSION_TX_NO_DATA;
    1466             :         }
    1467       69060 :       snd_space = clib_min (ctx->sp.snd_space, snd_space);
    1468       69060 :       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
    1469       69060 :         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
    1470             :     }
    1471             : 
    1472             :   /* Check how much we can pull. */
    1473       87251 :   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
    1474             : 
    1475       87251 :   if (PREDICT_FALSE (!ctx->max_len_to_snd))
    1476             :     {
    1477       28171 :       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
    1478       28171 :       session_tx_maybe_reschedule (wrk, ctx, elt);
    1479       28171 :       return SESSION_TX_NO_DATA;
    1480             :     }
    1481             : 
    1482       59080 :   vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
    1483             :                         CLIB_CACHE_LINE_BYTES);
    1484       59080 :   n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
    1485       59080 :   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
    1486             :     {
    1487           0 :       if (n_bufs)
    1488           0 :         vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
    1489           0 :       session_evt_add_head_old (wrk, elt);
    1490           0 :       vlib_node_increment_counter (wrk->vm, node->node_index,
    1491             :                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
    1492           0 :       return SESSION_TX_NO_BUFFERS;
    1493             :     }
    1494             : 
    1495       59080 :   if (transport_connection_is_tx_paced (ctx->tc))
    1496       42020 :     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
    1497             : 
    1498       59080 :   ctx->left_to_snd = ctx->max_len_to_snd;
    1499       59080 :   n_left = ctx->n_segs_per_evt;
    1500             : 
    1501       59080 :   vec_validate (ctx->transport_pending_bufs, n_left);
    1502             : 
    1503      544812 :   while (n_left >= 4)
    1504             :     {
    1505             :       vlib_buffer_t *b0, *b1;
    1506             :       u32 bi0, bi1;
    1507             : 
    1508      485732 :       pbi = ctx->tx_buffers[n_bufs - 3];
    1509      485732 :       pb = vlib_get_buffer (vm, pbi);
    1510      485732 :       vlib_prefetch_buffer_header (pb, STORE);
    1511      485732 :       pbi = ctx->tx_buffers[n_bufs - 4];
    1512      485732 :       pb = vlib_get_buffer (vm, pbi);
    1513      485732 :       vlib_prefetch_buffer_header (pb, STORE);
    1514             : 
    1515      485732 :       bi0 = ctx->tx_buffers[--n_bufs];
    1516      485732 :       bi1 = ctx->tx_buffers[--n_bufs];
    1517             : 
    1518      485732 :       b0 = vlib_get_buffer (vm, bi0);
    1519      485732 :       b1 = vlib_get_buffer (vm, bi1);
    1520             : 
    1521      485732 :       session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
    1522      485732 :       session_tx_fill_buffer (wrk, ctx, b1, &n_bufs, peek_data);
    1523             : 
    1524      485732 :       ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
    1525      485732 :       ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left + 1] = b1;
    1526      485732 :       n_left -= 2;
    1527             : 
    1528      485732 :       session_tx_add_pending_buffer (wrk, bi0, next_index);
    1529      485732 :       session_tx_add_pending_buffer (wrk, bi1, next_index);
    1530             :     }
    1531      175773 :   while (n_left)
    1532             :     {
    1533             :       vlib_buffer_t *b0;
    1534             :       u32 bi0;
    1535             : 
    1536      116693 :       if (n_left > 1)
    1537             :         {
    1538       57613 :           pbi = ctx->tx_buffers[n_bufs - 2];
    1539       57613 :           pb = vlib_get_buffer (vm, pbi);
    1540       57613 :           vlib_prefetch_buffer_header (pb, STORE);
    1541             :         }
    1542             : 
    1543      116693 :       bi0 = ctx->tx_buffers[--n_bufs];
    1544      116693 :       b0 = vlib_get_buffer (vm, bi0);
    1545      116693 :       session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
    1546             : 
    1547      116693 :       ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
    1548      116693 :       n_left -= 1;
    1549             : 
    1550      116693 :       session_tx_add_pending_buffer (wrk, bi0, next_index);
    1551             :     }
    1552             : 
    1553             :   /* Ask transport to push headers */
    1554       59080 :   ctx->transport_vft->push_header (ctx->tc, ctx->transport_pending_bufs,
    1555       59080 :                                    ctx->n_segs_per_evt);
    1556             : 
    1557       59080 :   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
    1558           0 :     session_tx_trace_frame (vm, node, next_index, ctx->transport_pending_bufs,
    1559           0 :                             ctx->n_segs_per_evt, ctx->s, n_trace);
    1560             : 
    1561       59080 :   if (PREDICT_FALSE (n_bufs))
    1562           0 :     vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
    1563             : 
    1564       59080 :   *n_tx_packets += ctx->n_segs_per_evt;
    1565             : 
    1566             :   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
    1567             :                ctx->s->tx_fifo->shr->has_event, wrk->last_vlib_time);
    1568             : 
    1569       59080 :   ASSERT (ctx->left_to_snd == 0);
    1570             : 
    1571             :   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
    1572             :    * check if application enqueued more data and reschedule accordingly */
    1573       59080 :   if (ctx->max_len_to_snd < ctx->max_dequeue)
    1574       33024 :     session_evt_add_old (wrk, elt);
    1575             :   else
    1576       26056 :     session_tx_maybe_reschedule (wrk, ctx, elt);
    1577             : 
    1578       59080 :   if (!peek_data)
    1579             :     {
    1580       17060 :       u32 n_dequeued = ctx->max_len_to_snd;
    1581       17060 :       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
    1582       17060 :         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
    1583       17060 :       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
    1584          51 :         session_dequeue_notify (ctx->s);
    1585             :     }
    1586       59080 :   return SESSION_TX_OK;
    1587             : }
    1588             : 
    1589             : int
    1590       83734 : session_tx_fifo_peek_and_snd (session_worker_t * wrk,
    1591             :                               vlib_node_runtime_t * node,
    1592             :                               session_evt_elt_t * e, int *n_tx_packets)
    1593             : {
    1594       83734 :   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
    1595             : }
    1596             : 
    1597             : int
    1598       18191 : session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
    1599             :                                  vlib_node_runtime_t * node,
    1600             :                                  session_evt_elt_t * e, int *n_tx_packets)
    1601             : {
    1602       18191 :   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
    1603             : }
    1604             : 
    1605             : int
    1606     1472670 : session_tx_fifo_dequeue_internal (session_worker_t * wrk,
    1607             :                                   vlib_node_runtime_t * node,
    1608             :                                   session_evt_elt_t * elt, int *n_tx_packets)
    1609             : {
    1610     1472670 :   transport_send_params_t *sp = &wrk->ctx.sp;
    1611     1472670 :   session_t *s = wrk->ctx.s;
    1612             :   clib_llist_index_t ei;
    1613             :   u32 n_packets;
    1614             : 
    1615     1472670 :   if (PREDICT_FALSE ((s->session_state >= SESSION_STATE_TRANSPORT_CLOSED) ||
    1616             :                      (s->session_state == SESSION_STATE_CONNECTING &&
    1617             :                       (s->flags & SESSION_F_HALF_OPEN))))
    1618          14 :     return 0;
    1619             : 
    1620             :   /* Clear custom-tx flag used to request reschedule for tx */
    1621     1472660 :   s->flags &= ~SESSION_F_CUSTOM_TX;
    1622             : 
    1623     1472660 :   sp->flags = 0;
    1624     1472660 :   sp->bytes_dequeued = 0;
    1625     1472660 :   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
    1626             :                                  TRANSPORT_PACER_MAX_BURST_PKTS);
    1627             : 
    1628             :   /* Grab elt index since app transports can enqueue events on tx */
    1629     1472660 :   ei = clib_llist_entry_index (wrk->event_elts, elt);
    1630             : 
    1631     1472660 :   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
    1632     1472660 :   *n_tx_packets += n_packets;
    1633             : 
    1634     1472660 :   elt = clib_llist_elt (wrk->event_elts, ei);
    1635             : 
    1636     1472660 :   if (s->flags & SESSION_F_CUSTOM_TX)
    1637             :     {
    1638     1273180 :       session_evt_add_old (wrk, elt);
    1639             :     }
    1640      199478 :   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
    1641             :     {
    1642       62105 :       svm_fifo_unset_event (s->tx_fifo);
    1643       62105 :       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
    1644       62080 :         if (svm_fifo_set_event (s->tx_fifo))
    1645       62080 :           session_evt_add_head_old (wrk, elt);
    1646             :     }
    1647             : 
    1648     1473300 :   if (sp->bytes_dequeued &&
    1649         641 :       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->bytes_dequeued))
    1650           1 :     session_dequeue_notify (s);
    1651             : 
    1652     1472660 :   return n_packets;
    1653             : }
    1654             : 
    1655             : always_inline session_t *
    1656     1582710 : session_event_get_session (session_worker_t * wrk, session_event_t * e)
    1657             : {
    1658     1582710 :   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
    1659           5 :     return 0;
    1660             : 
    1661     1582710 :   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
    1662     1582710 :   return pool_elt_at_index (wrk->sessions, e->session_index);
    1663             : }
    1664             : 
    1665             : always_inline void
    1666         928 : session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
    1667             : {
    1668             :   clib_llist_index_t ei;
    1669             :   void (*fp) (void *);
    1670             :   session_event_t *e;
    1671             :   session_t *s;
    1672             : 
    1673         928 :   ei = clib_llist_entry_index (wrk->event_elts, elt);
    1674         928 :   e = &elt->evt;
    1675             : 
    1676         928 :   switch (e->event_type)
    1677             :     {
    1678         146 :     case SESSION_CTRL_EVT_RPC:
    1679         146 :       fp = e->rpc_args.fp;
    1680         146 :       (*fp) (e->rpc_args.arg);
    1681         146 :       break;
    1682           0 :     case SESSION_CTRL_EVT_HALF_CLOSE:
    1683           0 :       s = session_get_from_handle_if_valid (e->session_handle);
    1684           0 :       if (PREDICT_FALSE (!s))
    1685           0 :         break;
    1686           0 :       session_transport_half_close (s);
    1687           0 :       break;
    1688         507 :     case SESSION_CTRL_EVT_CLOSE:
    1689         507 :       s = session_get_from_handle_if_valid (e->session_handle);
    1690         507 :       if (PREDICT_FALSE (!s))
    1691           0 :         break;
    1692         507 :       session_transport_close (s);
    1693         507 :       break;
    1694           0 :     case SESSION_CTRL_EVT_RESET:
    1695           0 :       s = session_get_from_handle_if_valid (e->session_handle);
    1696           0 :       if (PREDICT_FALSE (!s))
    1697           0 :         break;
    1698           0 :       session_transport_reset (s);
    1699           0 :       break;
    1700          43 :     case SESSION_CTRL_EVT_LISTEN:
    1701          43 :       session_mq_listen_handler (wrk, elt);
    1702          43 :       break;
    1703           0 :     case SESSION_CTRL_EVT_LISTEN_URI:
    1704           0 :       session_mq_listen_uri_handler (wrk, elt);
    1705           0 :       break;
    1706          25 :     case SESSION_CTRL_EVT_UNLISTEN:
    1707          25 :       session_mq_unlisten_handler (wrk, elt);
    1708          25 :       break;
    1709          51 :     case SESSION_CTRL_EVT_CONNECT:
    1710          51 :       session_mq_connect_handler (wrk, elt);
    1711          51 :       break;
    1712           0 :     case SESSION_CTRL_EVT_CONNECT_URI:
    1713           0 :       session_mq_connect_uri_handler (wrk, elt);
    1714           0 :       break;
    1715           0 :     case SESSION_CTRL_EVT_SHUTDOWN:
    1716           0 :       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
    1717           0 :       break;
    1718          60 :     case SESSION_CTRL_EVT_DISCONNECT:
    1719          60 :       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
    1720          60 :       break;
    1721           0 :     case SESSION_CTRL_EVT_DISCONNECTED:
    1722           0 :       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
    1723           0 :       break;
    1724          47 :     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
    1725          47 :       session_mq_accepted_reply_handler (wrk, elt);
    1726          47 :       break;
    1727          14 :     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
    1728          14 :       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
    1729             :                                                                     elt));
    1730          14 :       break;
    1731           6 :     case SESSION_CTRL_EVT_RESET_REPLY:
    1732           6 :       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
    1733           6 :       break;
    1734           0 :     case SESSION_CTRL_EVT_WORKER_UPDATE:
    1735           0 :       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
    1736           0 :       break;
    1737          24 :     case SESSION_CTRL_EVT_APP_DETACH:
    1738          24 :       app_mq_detach_handler (wrk, elt);
    1739          24 :       break;
    1740           0 :     case SESSION_CTRL_EVT_APP_WRK_RPC:
    1741           0 :       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
    1742           0 :       break;
    1743           5 :     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
    1744           5 :       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
    1745           5 :       break;
    1746           0 :     default:
    1747           0 :       clib_warning ("unhandled event type %d", e->event_type);
    1748             :     }
    1749             : 
    1750             :   /* Regrab elements in case pool moved */
    1751         928 :   elt = clib_llist_elt (wrk->event_elts, ei);
    1752         928 :   if (!clib_llist_elt_is_linked (elt, evt_list))
    1753             :     {
    1754         873 :       e = &elt->evt;
    1755         873 :       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
    1756         220 :         session_evt_ctrl_data_free (wrk, elt);
    1757         873 :       clib_llist_put (wrk->event_elts, elt);
    1758             :     }
    1759             :   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
    1760         928 : }
    1761             : 
    1762             : always_inline void
    1763     1582710 : session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
    1764             :                            session_evt_elt_t * elt, int *n_tx_packets)
    1765             : {
    1766     1582710 :   session_main_t *smm = &session_main;
    1767             :   app_worker_t *app_wrk;
    1768             :   clib_llist_index_t ei;
    1769             :   session_event_t *e;
    1770             :   session_t *s;
    1771             : 
    1772     1582710 :   ei = clib_llist_entry_index (wrk->event_elts, elt);
    1773     1582710 :   e = &elt->evt;
    1774             : 
    1775     1582710 :   switch (e->event_type)
    1776             :     {
    1777     1574600 :     case SESSION_IO_EVT_TX_FLUSH:
    1778             :     case SESSION_IO_EVT_TX:
    1779     1574600 :       s = session_event_get_session (wrk, e);
    1780     1574600 :       if (PREDICT_FALSE (!s))
    1781           5 :         break;
    1782     1574600 :       CLIB_PREFETCH (s->tx_fifo, sizeof (*(s->tx_fifo)), LOAD);
    1783     1574600 :       wrk->ctx.s = s;
    1784             :       /* Spray packets in per session type frames, since they go to
    1785             :        * different nodes */
    1786     1574600 :       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
    1787     1574600 :       break;
    1788        5604 :     case SESSION_IO_EVT_RX:
    1789        5604 :       s = session_event_get_session (wrk, e);
    1790        5604 :       if (!s || s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
    1791             :         break;
    1792        5604 :       transport_app_rx_evt (session_get_transport_proto (s),
    1793        5604 :                             s->connection_index, s->thread_index);
    1794        5604 :       break;
    1795        2506 :     case SESSION_IO_EVT_BUILTIN_RX:
    1796        2506 :       s = session_event_get_session (wrk, e);
    1797        2506 :       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
    1798             :         break;
    1799        2506 :       svm_fifo_unset_event (s->rx_fifo);
    1800        2506 :       app_wrk = app_worker_get (s->app_wrk_index);
    1801        2506 :       app_worker_rx_notify (app_wrk, s);
    1802        2506 :       break;
    1803           0 :     case SESSION_IO_EVT_TX_MAIN:
    1804           0 :       s = session_get_if_valid (e->session_index, 0 /* main thread */);
    1805           0 :       if (PREDICT_FALSE (!s))
    1806           0 :         break;
    1807           0 :       wrk->ctx.s = s;
    1808           0 :       if (PREDICT_TRUE (s != 0))
    1809           0 :         (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
    1810           0 :       break;
    1811           0 :     default:
    1812           0 :       clib_warning ("unhandled event type %d", e->event_type);
    1813             :     }
    1814             : 
    1815           0 :   SESSION_EVT (SESSION_EVT_IO_EVT_COUNTS, e->event_type, 1, wrk);
    1816             : 
    1817             :   /* Regrab elements in case pool moved */
    1818     1582710 :   elt = clib_llist_elt (wrk->event_elts, ei);
    1819     1582710 :   if (!clib_llist_elt_is_linked (elt, evt_list))
    1820      209018 :     clib_llist_put (wrk->event_elts, elt);
    1821     1582710 : }
    1822             : 
    1823             : /* *INDENT-OFF* */
    1824             : static const u32 session_evt_msg_sizes[] = {
    1825             : #define _(symc, sym)                                                    \
    1826             :   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
    1827             :   foreach_session_ctrl_evt
    1828             : #undef _
    1829             : };
    1830             : /* *INDENT-ON* */
    1831             : 
    1832             : always_inline void
    1833    73706900 : session_update_time_subscribers (session_main_t *smm, clib_time_type_t now,
    1834             :                                  u32 thread_index)
    1835             : {
    1836             :   session_update_time_fn *fn;
    1837             : 
    1838   155430000 :   vec_foreach (fn, smm->update_time_fns)
    1839    81706700 :     (*fn) (now, thread_index);
    1840    73668200 : }
    1841             : 
    1842             : always_inline void
    1843      167705 : session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
    1844             : {
    1845             :   session_evt_elt_t *elt;
    1846             : 
    1847      167705 :   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
    1848             :     {
    1849         421 :       elt = session_evt_alloc_ctrl (wrk);
    1850         421 :       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
    1851             :         {
    1852         275 :           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
    1853         275 :           elt->evt.event_type = evt->event_type;
    1854         275 :           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
    1855         275 :                             session_evt_msg_sizes[evt->event_type]);
    1856             :         }
    1857             :       else
    1858             :         {
    1859             :           /* Internal control events fit into io events footprint */
    1860         146 :           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
    1861             :         }
    1862             :     }
    1863             :   else
    1864             :     {
    1865      167284 :       elt = session_evt_alloc_new (wrk);
    1866      167284 :       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
    1867             :     }
    1868      167705 : }
    1869             : 
    1870             : static void
    1871       48962 : session_flush_pending_tx_buffers (session_worker_t * wrk,
    1872             :                                   vlib_node_runtime_t * node)
    1873             : {
    1874       48962 :   vlib_buffer_enqueue_to_next_vec (wrk->vm, node, &wrk->pending_tx_buffers,
    1875             :                                    &wrk->pending_tx_nexts,
    1876       48962 :                                    vec_len (wrk->pending_tx_nexts));
    1877       48962 :   vec_reset_length (wrk->pending_tx_buffers);
    1878       48962 :   vec_reset_length (wrk->pending_tx_nexts);
    1879       48962 : }
    1880             : 
    1881             : int
    1882    73662500 : session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
    1883             : {
    1884    73662500 :   svm_msg_q_msg_t _msg, *msg = &_msg;
    1885    73662500 :   u32 i, n_to_dequeue = 0;
    1886             :   session_event_t *evt;
    1887             : 
    1888    73662500 :   n_to_dequeue = svm_msg_q_size (mq);
    1889    73822300 :   for (i = 0; i < n_to_dequeue; i++)
    1890             :     {
    1891      167705 :       svm_msg_q_sub_raw (mq, msg);
    1892      167705 :       evt = svm_msg_q_msg_data (mq, msg);
    1893      167705 :       session_evt_add_to_list (wrk, evt);
    1894      167705 :       svm_msg_q_free_msg (mq, msg);
    1895             :     }
    1896             : 
    1897    73654600 :   return n_to_dequeue;
    1898             : }
    1899             : 
    1900             : static void
    1901      589313 : session_wrk_update_state (session_worker_t *wrk)
    1902             : {
    1903      589313 :   vlib_main_t *vm = wrk->vm;
    1904             : 
    1905      589313 :   if (wrk->state == SESSION_WRK_POLLING)
    1906             :     {
    1907      584276 :       if (clib_llist_elts (wrk->event_elts) == 5 &&
    1908          11 :           vlib_last_vectors_per_main_loop (vm) < 1)
    1909             :         {
    1910          11 :           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
    1911          11 :           vlib_node_set_state (vm, session_queue_node.index,
    1912             :                                VLIB_NODE_STATE_INTERRUPT);
    1913             :         }
    1914             :     }
    1915        5048 :   else if (wrk->state == SESSION_WRK_INTERRUPT)
    1916             :     {
    1917       10079 :       if (clib_llist_elts (wrk->event_elts) > 5 ||
    1918        5035 :           vlib_last_vectors_per_main_loop (vm) > 1)
    1919             :         {
    1920           9 :           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
    1921           9 :           vlib_node_set_state (vm, session_queue_node.index,
    1922             :                                VLIB_NODE_STATE_POLLING);
    1923             :         }
    1924        5035 :       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
    1925             :         {
    1926           4 :           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
    1927             :         }
    1928             :     }
    1929             :   else
    1930             :     {
    1931           4 :       if (clib_llist_elts (wrk->event_elts))
    1932             :         {
    1933           4 :           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
    1934             :         }
    1935             :     }
    1936      589313 : }
    1937             : 
    1938             : static uword
    1939    73709400 : session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
    1940             :                        vlib_frame_t * frame)
    1941             : {
    1942    73709400 :   u32 thread_index = vm->thread_index, __clib_unused n_evts;
    1943             :   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
    1944    73709400 :   session_main_t *smm = vnet_get_session_main ();
    1945    73699500 :   session_worker_t *wrk = &smm->wrk[thread_index];
    1946             :   clib_llist_index_t ei, next_ei, old_ti;
    1947             :   int n_tx_packets;
    1948             : 
    1949             :   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
    1950             : 
    1951    73699500 :   session_wrk_update_time (wrk, vlib_time_now (vm));
    1952             : 
    1953             :   /*
    1954             :    *  Update transport time
    1955             :    */
    1956    73720900 :   session_update_time_subscribers (smm, wrk->last_vlib_time, thread_index);
    1957    73754100 :   n_tx_packets = vec_len (wrk->pending_tx_buffers);
    1958             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
    1959             : 
    1960    73754100 :   if (PREDICT_FALSE (wrk->dma_enabled))
    1961             :     {
    1962           0 :       if (wrk->trans_head == ((wrk->trans_tail + 1) & (wrk->trans_size - 1)))
    1963           0 :         return 0;
    1964           0 :       wrk->batch = vlib_dma_batch_new (vm, wrk->config_index);
    1965           0 :       if (!wrk->batch)
    1966           0 :         return 0;
    1967             :     }
    1968             : 
    1969             :   /*
    1970             :    *  Dequeue new internal mq events
    1971             :    */
    1972             : 
    1973    73754100 :   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
    1974             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
    1975             : 
    1976             :   /*
    1977             :    * Handle control events
    1978             :    */
    1979             : 
    1980    73653900 :   ei = wrk->ctrl_head;
    1981    73653900 :   ctrl_he = clib_llist_elt (wrk->event_elts, ei);
    1982    73619200 :   next_ei = clib_llist_next_index (ctrl_he, evt_list);
    1983    73619200 :   old_ti = clib_llist_prev_index (ctrl_he, evt_list);
    1984    73616600 :   while (ei != old_ti)
    1985             :     {
    1986         928 :       ei = next_ei;
    1987         928 :       elt = clib_llist_elt (wrk->event_elts, next_ei);
    1988         928 :       next_ei = clib_llist_next_index (elt, evt_list);
    1989         928 :       clib_llist_remove (wrk->event_elts, evt_list, elt);
    1990         928 :       session_event_dispatch_ctrl (wrk, elt);
    1991             :     }
    1992             : 
    1993             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
    1994             : 
    1995             :   /*
    1996             :    * Handle the new io events.
    1997             :    */
    1998             : 
    1999    73615600 :   new_he = clib_llist_elt (wrk->event_elts, wrk->new_head);
    2000    73593600 :   old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
    2001    73592900 :   old_ti = clib_llist_prev_index (old_he, evt_list);
    2002             : 
    2003    73592900 :   ei = clib_llist_next_index (new_he, evt_list);
    2004    73801900 :   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
    2005             :     {
    2006      209018 :       elt = clib_llist_elt (wrk->event_elts, ei);
    2007      209018 :       ei = clib_llist_next_index (elt, evt_list);
    2008      209018 :       clib_llist_remove (wrk->event_elts, evt_list, elt);
    2009      209018 :       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
    2010             :     }
    2011             : 
    2012             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
    2013             : 
    2014             :   /*
    2015             :    * Handle the old io events, if we had any prior to processing the new ones
    2016             :    */
    2017             : 
    2018    73592900 :   if (old_ti != wrk->old_head)
    2019             :     {
    2020     1346170 :       old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
    2021     1346170 :       ei = clib_llist_next_index (old_he, evt_list);
    2022             : 
    2023     1374170 :       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
    2024             :         {
    2025     1373690 :           elt = clib_llist_elt (wrk->event_elts, ei);
    2026     1373690 :           next_ei = clib_llist_next_index (elt, evt_list);
    2027     1373690 :           clib_llist_remove (wrk->event_elts, evt_list, elt);
    2028             : 
    2029     1373690 :           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
    2030             : 
    2031     1373690 :           if (ei == old_ti)
    2032     1345700 :             break;
    2033             : 
    2034       27997 :           ei = next_ei;
    2035             :         };
    2036             :     }
    2037             : 
    2038    73592900 :   if (PREDICT_FALSE (wrk->dma_enabled))
    2039             :     {
    2040           0 :       if (wrk->batch_num)
    2041             :         {
    2042           0 :           vlib_dma_batch_set_cookie (vm, wrk->batch, wrk->trans_tail);
    2043           0 :           wrk->batch_num = 0;
    2044           0 :           wrk->trans_tail++;
    2045           0 :           if (wrk->trans_tail == wrk->trans_size)
    2046           0 :             wrk->trans_tail = 0;
    2047             :         }
    2048             : 
    2049           0 :       vlib_dma_batch_submit (vm, wrk->batch);
    2050             :     }
    2051             : 
    2052             :   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
    2053             : 
    2054    73592900 :   if (vec_len (wrk->pending_tx_buffers))
    2055       48962 :     session_flush_pending_tx_buffers (wrk, node);
    2056             : 
    2057    73592900 :   vlib_node_increment_counter (vm, session_queue_node.index,
    2058             :                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
    2059             : 
    2060             :   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
    2061             : 
    2062    73606700 :   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
    2063      589313 :     session_wrk_update_state (wrk);
    2064             : 
    2065    73606900 :   return n_tx_packets;
    2066             : }
    2067             : 
    2068             : /* *INDENT-OFF* */
    2069      183788 : VLIB_REGISTER_NODE (session_queue_node) = {
    2070             :   .function = session_queue_node_fn,
    2071             :   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
    2072             :   .name = "session-queue",
    2073             :   .format_trace = format_session_queue_trace,
    2074             :   .type = VLIB_NODE_TYPE_INPUT,
    2075             :   .n_errors = SESSION_QUEUE_N_ERROR,
    2076             :   .error_counters = session_error_counters,
    2077             :   .state = VLIB_NODE_STATE_DISABLED,
    2078             : };
    2079             : /* *INDENT-ON* */
    2080             : 
    2081             : static clib_error_t *
    2082        4996 : session_wrk_tfd_read_ready (clib_file_t *cf)
    2083             : {
    2084        4996 :   session_worker_t *wrk = session_main_get_worker (cf->private_data);
    2085             :   u64 buf;
    2086             :   int rv;
    2087             : 
    2088        4996 :   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
    2089        4996 :   rv = read (wrk->timerfd, &buf, sizeof (buf));
    2090        4996 :   if (rv < 0 && errno != EAGAIN)
    2091           0 :     clib_unix_warning ("failed");
    2092        4996 :   return 0;
    2093             : }
    2094             : 
    2095             : static clib_error_t *
    2096           0 : session_wrk_tfd_write_ready (clib_file_t *cf)
    2097             : {
    2098           0 :   return 0;
    2099             : }
    2100             : 
    2101             : void
    2102           2 : session_wrk_enable_adaptive_mode (session_worker_t *wrk)
    2103             : {
    2104           2 :   u32 thread_index = wrk->vm->thread_index;
    2105           2 :   clib_file_t template = { 0 };
    2106             : 
    2107           2 :   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
    2108           0 :     clib_warning ("timerfd_create");
    2109             : 
    2110           2 :   template.read_function = session_wrk_tfd_read_ready;
    2111           2 :   template.write_function = session_wrk_tfd_write_ready;
    2112           2 :   template.file_descriptor = wrk->timerfd;
    2113           2 :   template.private_data = thread_index;
    2114           2 :   template.polling_thread_index = thread_index;
    2115           2 :   template.description = format (0, "session-wrk-tfd-%u", thread_index);
    2116             : 
    2117           2 :   wrk->timerfd_file = clib_file_add (&file_main, &template);
    2118           2 :   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
    2119           2 : }
    2120             : 
    2121             : static clib_error_t *
    2122         575 : session_queue_exit (vlib_main_t * vm)
    2123             : {
    2124         575 :   if (vlib_get_n_threads () < 2)
    2125         538 :     return 0;
    2126             : 
    2127             :   /*
    2128             :    * Shut off (especially) worker-thread session nodes.
    2129             :    * Otherwise, vpp can crash as the main thread unmaps the
    2130             :    * API segment.
    2131             :    */
    2132          37 :   vlib_worker_thread_barrier_sync (vm);
    2133          37 :   session_node_enable_disable (0 /* is_enable */ );
    2134          37 :   vlib_worker_thread_barrier_release (vm);
    2135          37 :   return 0;
    2136             : }
    2137             : 
    2138        2301 : VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
    2139             : 
    2140             : static uword
    2141          12 : session_queue_run_on_main (vlib_main_t * vm)
    2142             : {
    2143             :   vlib_node_runtime_t *node;
    2144             : 
    2145          12 :   node = vlib_node_get_runtime (vm, session_queue_node.index);
    2146          12 :   return session_queue_node_fn (vm, node, 0);
    2147             : }
    2148             : 
    2149             : static uword
    2150          21 : session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
    2151             :                        vlib_frame_t * f)
    2152             : {
    2153          21 :   uword *event_data = 0;
    2154          21 :   f64 timeout = 1.0;
    2155             :   uword event_type;
    2156             : 
    2157             :   while (1)
    2158             :     {
    2159          33 :       vlib_process_wait_for_event_or_clock (vm, timeout);
    2160          12 :       event_type = vlib_process_get_events (vm, (uword **) & event_data);
    2161             : 
    2162          12 :       switch (event_type)
    2163             :         {
    2164           0 :         case SESSION_Q_PROCESS_RUN_ON_MAIN:
    2165             :           /* Run session queue node on main thread */
    2166           0 :           session_queue_run_on_main (vm);
    2167           0 :           break;
    2168           0 :         case SESSION_Q_PROCESS_STOP:
    2169           0 :           vlib_node_set_state (vm, session_queue_process_node.index,
    2170             :                                VLIB_NODE_STATE_DISABLED);
    2171           0 :           timeout = 100000.0;
    2172           0 :           break;
    2173          12 :         case ~0:
    2174             :           /* Timed out. Run on main to ensure all events are handled */
    2175          12 :           session_queue_run_on_main (vm);
    2176          12 :           break;
    2177             :         }
    2178          12 :       vec_reset_length (event_data);
    2179             :     }
    2180             :   return 0;
    2181             : }
    2182             : 
    2183             : /* *INDENT-OFF* */
    2184      183788 : VLIB_REGISTER_NODE (session_queue_process_node) =
    2185             : {
    2186             :   .function = session_queue_process,
    2187             :   .type = VLIB_NODE_TYPE_PROCESS,
    2188             :   .name = "session-queue-process",
    2189             :   .state = VLIB_NODE_STATE_DISABLED,
    2190             : };
    2191             : /* *INDENT-ON* */
    2192             : 
    2193             : static_always_inline uword
    2194           0 : session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
    2195             :                                 vlib_frame_t * frame)
    2196             : {
    2197           0 :   session_main_t *sm = &session_main;
    2198           0 :   if (!sm->wrk[0].vpp_event_queue)
    2199           0 :     return 0;
    2200           0 :   node = vlib_node_get_runtime (vm, session_queue_node.index);
    2201           0 :   return session_queue_node_fn (vm, node, frame);
    2202             : }
    2203             : 
    2204             : /* *INDENT-OFF* */
    2205      183788 : VLIB_REGISTER_NODE (session_queue_pre_input_node) =
    2206             : {
    2207             :   .function = session_queue_pre_input_inline,
    2208             :   .type = VLIB_NODE_TYPE_PRE_INPUT,
    2209             :   .name = "session-queue-main",
    2210             :   .state = VLIB_NODE_STATE_DISABLED,
    2211             : };
    2212             : /* *INDENT-ON* */
    2213             : 
    2214             : /*
    2215             :  * fd.io coding-style-patch-verification: ON
    2216             :  *
    2217             :  * Local Variables:
    2218             :  * eval: (c-set-style "gnu")
    2219             :  * End:
    2220             :  */

Generated by: LCOV version 1.14