LCOV - code coverage report
Current view: top level - vnet/session - application_worker.c (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 301 444 67.8 %
Date: 2023-07-05 22:20:52 Functions: 41 49 83.7 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2019 Cisco and/or its affiliates.
       3             :  * Licensed under the Apache License, Version 2.0 (the "License");
       4             :  * you may not use this file except in compliance with the License.
       5             :  * You may obtain a copy of the License at:
       6             :  *
       7             :  *     http://www.apache.org/licenses/LICENSE-2.0
       8             :  *
       9             :  * Unless required by applicable law or agreed to in writing, software
      10             :  * distributed under the License is distributed on an "AS IS" BASIS,
      11             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             :  * See the License for the specific language governing permissions and
      13             :  * limitations under the License.
      14             :  */
      15             : 
      16             : #include <vnet/session/application.h>
      17             : #include <vnet/session/application_interface.h>
      18             : #include <vnet/session/session.h>
      19             : 
      20             : /**
      21             :  * Pool of workers associated to apps
      22             :  */
      23             : static app_worker_t *app_workers;
      24             : 
      25             : app_worker_t *
      26         216 : app_worker_alloc (application_t * app)
      27             : {
      28             :   app_worker_t *app_wrk;
      29         216 :   pool_get (app_workers, app_wrk);
      30         216 :   clib_memset (app_wrk, 0, sizeof (*app_wrk));
      31         216 :   app_wrk->wrk_index = app_wrk - app_workers;
      32         216 :   app_wrk->app_index = app->app_index;
      33         216 :   app_wrk->wrk_map_index = ~0;
      34         216 :   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
      35         216 :   clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
      36         216 :   clib_spinlock_init (&app_wrk->postponed_mq_msgs_lock);
      37             :   APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
      38         216 :   return app_wrk;
      39             : }
      40             : 
      41             : app_worker_t *
      42       13659 : app_worker_get (u32 wrk_index)
      43             : {
      44       13659 :   return pool_elt_at_index (app_workers, wrk_index);
      45             : }
      46             : 
      47             : app_worker_t *
      48      252242 : app_worker_get_if_valid (u32 wrk_index)
      49             : {
      50      252242 :   if (pool_is_free_index (app_workers, wrk_index))
      51         185 :     return 0;
      52      252057 :   return pool_elt_at_index (app_workers, wrk_index);
      53             : }
      54             : 
      55             : void
      56          87 : app_worker_free (app_worker_t * app_wrk)
      57             : {
      58          87 :   application_t *app = application_get (app_wrk->app_index);
      59          87 :   vnet_unlisten_args_t _a, *a = &_a;
      60          87 :   u64 handle, *handles = 0, *sm_indices = 0;
      61             :   segment_manager_t *sm;
      62             :   session_handle_t *sh;
      63             :   session_t *ls;
      64             :   u32 sm_index;
      65             :   int i;
      66             : 
      67             :   /*
      68             :    *  Listener cleanup
      69             :    */
      70             : 
      71             :   /* *INDENT-OFF* */
      72        5680 :   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
      73             :     ls = listen_session_get_from_handle (handle);
      74             :     vec_add1 (handles, app_listen_session_handle (ls));
      75             :     vec_add1 (sm_indices, sm_index);
      76             :     sm = segment_manager_get (sm_index);
      77             :   }));
      78             :   /* *INDENT-ON* */
      79             : 
      80         112 :   for (i = 0; i < vec_len (handles); i++)
      81             :     {
      82             :       /* Cleanup listener */
      83          25 :       a->app_index = app->app_index;
      84          25 :       a->wrk_map_index = app_wrk->wrk_map_index;
      85          25 :       a->handle = handles[i];
      86          25 :       (void) vnet_unlisten (a);
      87             : 
      88          25 :       sm = segment_manager_get_if_valid (sm_indices[i]);
      89          25 :       if (sm && !segment_manager_app_detached (sm))
      90             :         {
      91           0 :           sm->first_is_protected = 0;
      92           0 :           segment_manager_init_free (sm);
      93             :         }
      94             :     }
      95          87 :   vec_reset_length (handles);
      96          87 :   vec_free (sm_indices);
      97          87 :   hash_free (app_wrk->listeners_table);
      98             : 
      99             :   /*
     100             :    * Connects segment manager cleanup
     101             :    */
     102             : 
     103          87 :   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
     104             :     {
     105          87 :       sm = segment_manager_get (app_wrk->connects_seg_manager);
     106          87 :       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
     107          87 :       sm->first_is_protected = 0;
     108          87 :       segment_manager_init_free (sm);
     109             :     }
     110             : 
     111             :   /*
     112             :    * Half-open cleanup
     113             :    */
     114             : 
     115          88 :   pool_foreach (sh, app_wrk->half_open_table)
     116           1 :     session_cleanup_half_open (*sh);
     117             : 
     118          87 :   pool_free (app_wrk->half_open_table);
     119             : 
     120             :   /*
     121             :    * Detached listener segment managers cleanup
     122             :    */
     123         112 :   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
     124             :     {
     125          25 :       sm = segment_manager_get (app_wrk->detached_seg_managers[i]);
     126          25 :       segment_manager_init_free (sm);
     127             :     }
     128          87 :   vec_free (app_wrk->detached_seg_managers);
     129          87 :   clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
     130          87 :   clib_spinlock_free (&app_wrk->postponed_mq_msgs_lock);
     131             : 
     132             :   if (CLIB_DEBUG)
     133          87 :     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
     134          87 :   pool_put (app_workers, app_wrk);
     135          87 : }
     136             : 
     137             : application_t *
     138           0 : app_worker_get_app (u32 wrk_index)
     139             : {
     140             :   app_worker_t *app_wrk;
     141           0 :   app_wrk = app_worker_get_if_valid (wrk_index);
     142           0 :   if (!app_wrk)
     143           0 :     return 0;
     144           0 :   return application_get_if_valid (app_wrk->app_index);
     145             : }
     146             : 
     147             : static segment_manager_t *
     148          78 : app_worker_alloc_segment_manager (app_worker_t * app_wrk)
     149             : {
     150             :   segment_manager_t *sm;
     151             : 
     152          78 :   sm = segment_manager_alloc ();
     153          78 :   sm->app_wrk_index = app_wrk->wrk_index;
     154          78 :   segment_manager_init (sm);
     155          78 :   return sm;
     156             : }
     157             : 
     158             : static int
     159         404 : app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
     160             : {
     161         404 :   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
     162             :   int rv;
     163             : 
     164         404 :   if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
     165             :                                                  &rx_fifo, &tx_fifo)))
     166           0 :     return rv;
     167             : 
     168         404 :   rx_fifo->shr->master_session_index = s->session_index;
     169         404 :   rx_fifo->master_thread_index = s->thread_index;
     170             : 
     171         404 :   tx_fifo->shr->master_session_index = s->session_index;
     172         404 :   tx_fifo->master_thread_index = s->thread_index;
     173             : 
     174         404 :   s->rx_fifo = rx_fifo;
     175         404 :   s->tx_fifo = tx_fifo;
     176         404 :   return 0;
     177             : }
     178             : 
     179             : int
     180          78 : app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
     181             : {
     182             :   segment_manager_t *sm;
     183             : 
     184             :   /* Allocate segment manager. All sessions derived out of a listen session
     185             :    * have fifos allocated by the same segment manager. */
     186          78 :   if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
     187           0 :     return SESSION_E_ALLOC;
     188             : 
     189             :   /* Once the first segment is mapped, don't remove it until unlisten */
     190          78 :   sm->first_is_protected = 1;
     191             : 
     192             :   /* Keep track of the segment manager for the listener or this worker */
     193          78 :   hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
     194             :             segment_manager_index (sm));
     195             : 
     196          78 :   if (transport_connection_is_cless (session_get_transport (ls)))
     197             :     {
     198           3 :       if (ls->rx_fifo)
     199           0 :         return SESSION_E_NOSUPPORT;
     200           3 :       return app_worker_alloc_session_fifos (sm, ls);
     201             :     }
     202          75 :   return 0;
     203             : }
     204             : 
     205             : int
     206          75 : app_worker_start_listen (app_worker_t * app_wrk,
     207             :                          app_listener_t * app_listener)
     208             : {
     209             :   session_t *ls;
     210             :   int rv;
     211             : 
     212          75 :   if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
     213           2 :     return SESSION_E_ALREADY_LISTENING;
     214             : 
     215         146 :   app_listener->workers = clib_bitmap_set (app_listener->workers,
     216          73 :                                            app_wrk->wrk_map_index, 1);
     217             : 
     218          73 :   if (app_listener->session_index != SESSION_INVALID_INDEX)
     219             :     {
     220          60 :       ls = session_get (app_listener->session_index, 0);
     221          60 :       if ((rv = app_worker_init_listener (app_wrk, ls)))
     222           0 :         return rv;
     223             :     }
     224             : 
     225          73 :   if (app_listener->local_index != SESSION_INVALID_INDEX)
     226             :     {
     227          18 :       ls = session_get (app_listener->local_index, 0);
     228          18 :       if ((rv = app_worker_init_listener (app_wrk, ls)))
     229           0 :         return rv;
     230             :     }
     231             : 
     232          73 :   return 0;
     233             : }
     234             : 
     235             : static void
     236          31 : app_worker_add_detached_sm (app_worker_t * app_wrk, u32 sm_index)
     237             : {
     238          31 :   vec_add1 (app_wrk->detached_seg_managers, sm_index);
     239          31 : }
     240             : 
     241             : void
     242           6 : app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index)
     243             : {
     244             :   u32 i;
     245             : 
     246           6 :   clib_spinlock_lock (&app_wrk->detached_seg_managers_lock);
     247           6 :   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
     248             :     {
     249           6 :       if (app_wrk->detached_seg_managers[i] == sm_index)
     250             :         {
     251           6 :           vec_del1 (app_wrk->detached_seg_managers, i);
     252           6 :           break;
     253             :         }
     254             :     }
     255           6 :   clib_spinlock_unlock (&app_wrk->detached_seg_managers_lock);
     256           6 : }
     257             : 
     258             : static void
     259          62 : app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
     260             : {
     261             :   session_handle_t handle;
     262             :   segment_manager_t *sm;
     263             :   uword *sm_indexp;
     264          62 :   session_state_t *states = 0;
     265             : 
     266          62 :   handle = listen_session_get_handle (ls);
     267          62 :   sm_indexp = hash_get (app_wrk->listeners_table, handle);
     268          62 :   if (PREDICT_FALSE (!sm_indexp))
     269           0 :     return;
     270             : 
     271             :   /* Dealloc fifos, if any (dgram listeners) */
     272          62 :   if (ls->rx_fifo)
     273             :     {
     274           3 :       segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo);
     275           3 :       ls->tx_fifo = ls->rx_fifo = 0;
     276             :     }
     277             : 
     278             :   /* Try to cleanup segment manager */
     279          62 :   sm = segment_manager_get (*sm_indexp);
     280          62 :   if (sm)
     281             :     {
     282          62 :       sm->first_is_protected = 0;
     283          62 :       segment_manager_app_detach (sm);
     284          62 :       if (!segment_manager_has_fifos (sm))
     285             :         {
     286             :           /* Empty segment manager, cleanup it up */
     287          31 :           segment_manager_free (sm);
     288             :         }
     289             :       else
     290             :         {
     291             :           /* Delete sessions in CREATED state */
     292          31 :           vec_add1 (states, SESSION_STATE_CREATED);
     293          31 :           segment_manager_del_sessions_filter (sm, states);
     294          31 :           vec_free (states);
     295             : 
     296             :           /* Track segment manager in case app detaches and all the
     297             :            * outstanding sessions need to be closed */
     298          31 :           app_worker_add_detached_sm (app_wrk, *sm_indexp);
     299          31 :           sm->flags |= SEG_MANAGER_F_DETACHED_LISTENER;
     300             :         }
     301             :     }
     302             : 
     303          62 :   hash_unset (app_wrk->listeners_table, handle);
     304             : }
     305             : 
     306             : int
     307          57 : app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
     308             : {
     309             :   session_t *ls;
     310             : 
     311          57 :   if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
     312           0 :     return 0;
     313             : 
     314          57 :   if (al->session_index != SESSION_INVALID_INDEX)
     315             :     {
     316          44 :       ls = listen_session_get (al->session_index);
     317          44 :       app_worker_stop_listen_session (app_wrk, ls);
     318             :     }
     319             : 
     320          57 :   if (al->local_index != SESSION_INVALID_INDEX)
     321             :     {
     322          18 :       ls = listen_session_get (al->local_index);
     323          18 :       app_worker_stop_listen_session (app_wrk, ls);
     324             :     }
     325             : 
     326          57 :   clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
     327          57 :   if (clib_bitmap_is_zero (al->workers))
     328          57 :     app_listener_cleanup (al);
     329             : 
     330          57 :   return 0;
     331             : }
     332             : 
     333             : int
     334         176 : app_worker_init_accepted (session_t * s)
     335             : {
     336             :   app_worker_t *app_wrk;
     337             :   segment_manager_t *sm;
     338             :   session_t *listener;
     339             :   application_t *app;
     340             : 
     341         176 :   listener = listen_session_get_from_handle (s->listener_handle);
     342         176 :   app_wrk = application_listener_select_worker (listener);
     343         176 :   if (PREDICT_FALSE (app_wrk->mq_congested))
     344           0 :     return -1;
     345             : 
     346         176 :   s->app_wrk_index = app_wrk->wrk_index;
     347         176 :   app = application_get (app_wrk->app_index);
     348         176 :   if (app->cb_fns.fifo_tuning_callback)
     349           0 :     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
     350             : 
     351         176 :   sm = app_worker_get_listen_segment_manager (app_wrk, listener);
     352         176 :   if (app_worker_alloc_session_fifos (sm, s))
     353           0 :     return -1;
     354             : 
     355         176 :   return 0;
     356             : }
     357             : 
     358             : int
     359         204 : app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
     360             : {
     361         204 :   application_t *app = application_get (app_wrk->app_index);
     362         204 :   return app->cb_fns.session_accept_callback (s);
     363             : }
     364             : 
     365             : int
     366         225 : app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
     367             : {
     368         225 :   application_t *app = application_get (app_wrk->app_index);
     369             :   segment_manager_t *sm;
     370             : 
     371         225 :   if (app->cb_fns.fifo_tuning_callback)
     372           0 :     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
     373             : 
     374             :   /* Allocate fifos for session, unless the app is a builtin proxy */
     375         225 :   if (application_is_builtin_proxy (app))
     376           0 :     return 0;
     377             : 
     378         225 :   sm = app_worker_get_connect_segment_manager (app_wrk);
     379         225 :   return app_worker_alloc_session_fifos (sm, s);
     380             : }
     381             : 
     382             : int
     383         207 : app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
     384             :                            session_error_t err, u32 opaque)
     385             : {
     386         207 :   application_t *app = application_get (app_wrk->app_index);
     387         207 :   return app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
     388             :                                                  s, err);
     389             : }
     390             : 
     391             : int
     392         148 : app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
     393             : {
     394             :   session_handle_t *shp;
     395             : 
     396         148 :   ASSERT (session_vlib_thread_is_cl_thread ());
     397         148 :   pool_get (app_wrk->half_open_table, shp);
     398         148 :   *shp = sh;
     399             : 
     400         148 :   return (shp - app_wrk->half_open_table);
     401             : }
     402             : 
     403             : int
     404         147 : app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
     405             : {
     406         147 :   application_t *app = application_get (app_wrk->app_index);
     407         147 :   ASSERT (session_vlib_thread_is_cl_thread ());
     408         147 :   pool_put_index (app_wrk->half_open_table, s->ho_index);
     409         147 :   if (app->cb_fns.half_open_cleanup_callback)
     410           2 :     app->cb_fns.half_open_cleanup_callback (s);
     411         147 :   return 0;
     412             : }
     413             : 
     414             : int
     415         152 : app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
     416             : {
     417         152 :   application_t *app = application_get (app_wrk->app_index);
     418         152 :   app->cb_fns.session_disconnect_callback (s);
     419         152 :   return 0;
     420             : }
     421             : 
     422             : int
     423         264 : app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
     424             : {
     425         264 :   application_t *app = application_get (app_wrk->app_index);
     426         264 :   if (app->cb_fns.session_transport_closed_callback)
     427           0 :     app->cb_fns.session_transport_closed_callback (s);
     428         264 :   return 0;
     429             : }
     430             : 
     431             : int
     432           5 : app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
     433             : {
     434           5 :   application_t *app = application_get (app_wrk->app_index);
     435           5 :   app->cb_fns.session_reset_callback (s);
     436           5 :   return 0;
     437             : }
     438             : 
     439             : int
     440         420 : app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
     441             :                            session_cleanup_ntf_t ntf)
     442             : {
     443         420 :   application_t *app = application_get (app_wrk->app_index);
     444         420 :   if (app->cb_fns.session_cleanup_callback)
     445         154 :     app->cb_fns.session_cleanup_callback (s, ntf);
     446         420 :   return 0;
     447             : }
     448             : 
     449             : int
     450       79264 : app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s)
     451             : {
     452       79264 :   application_t *app = application_get (app_wrk->app_index);
     453       79264 :   app->cb_fns.builtin_app_rx_callback (s);
     454       79264 :   return 0;
     455             : }
     456             : 
     457             : int
     458         134 : app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s)
     459             : {
     460         134 :   application_t *app = application_get (app_wrk->app_index);
     461             : 
     462         134 :   if (!app->cb_fns.builtin_app_tx_callback)
     463           0 :     return 0;
     464             : 
     465         134 :   app->cb_fns.builtin_app_tx_callback (s);
     466         134 :   return 0;
     467             : }
     468             : 
     469             : int
     470           0 : app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
     471             :                            session_handle_t new_sh)
     472             : {
     473           0 :   application_t *app = application_get (app_wrk->app_index);
     474           0 :   app->cb_fns.session_migrate_callback (s, new_sh);
     475           0 :   return 0;
     476             : }
     477             : 
     478             : int
     479           0 : app_worker_own_session (app_worker_t * app_wrk, session_t * s)
     480             : {
     481             :   segment_manager_t *sm;
     482             :   svm_fifo_t *rxf, *txf;
     483             :   int rv;
     484             : 
     485           0 :   if (s->session_state == SESSION_STATE_LISTENING)
     486           0 :     return application_change_listener_owner (s, app_wrk);
     487             : 
     488           0 :   s->app_wrk_index = app_wrk->wrk_index;
     489             : 
     490           0 :   rxf = s->rx_fifo;
     491           0 :   txf = s->tx_fifo;
     492             : 
     493           0 :   if (!rxf || !txf)
     494           0 :     return 0;
     495             : 
     496           0 :   s->rx_fifo = 0;
     497           0 :   s->tx_fifo = 0;
     498             : 
     499           0 :   sm = app_worker_get_connect_segment_manager (app_wrk);
     500           0 :   if ((rv = app_worker_alloc_session_fifos (sm, s)))
     501           0 :     return rv;
     502             : 
     503           0 :   if (!svm_fifo_is_empty_cons (rxf))
     504           0 :     svm_fifo_clone (s->rx_fifo, rxf);
     505             : 
     506           0 :   if (!svm_fifo_is_empty_cons (txf))
     507           0 :     svm_fifo_clone (s->tx_fifo, txf);
     508             : 
     509           0 :   segment_manager_dealloc_fifos (rxf, txf);
     510             : 
     511           0 :   return 0;
     512             : }
     513             : 
     514             : int
     515         213 : app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
     516             :                             session_handle_t *rsh)
     517             : {
     518         213 :   if (PREDICT_FALSE (app_wrk->mq_congested))
     519           0 :     return SESSION_E_REFUSED;
     520             : 
     521         213 :   sep->app_wrk_index = app_wrk->wrk_index;
     522             : 
     523         213 :   return session_open (sep, rsh);
     524             : }
     525             : 
     526             : int
     527           0 : app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
     528             :                                 svm_fifo_t * f,
     529             :                                 session_ft_action_t act, u32 len)
     530             : {
     531           0 :   application_t *app = application_get (app_wrk->app_index);
     532           0 :   return app->cb_fns.fifo_tuning_callback (s, f, act, len);
     533             : }
     534             : 
     535             : segment_manager_t *
     536         230 : app_worker_get_connect_segment_manager (app_worker_t * app)
     537             : {
     538         230 :   ASSERT (app->connects_seg_manager != (u32) ~ 0);
     539         230 :   return segment_manager_get (app->connects_seg_manager);
     540             : }
     541             : 
     542             : segment_manager_t *
     543         189 : app_worker_get_listen_segment_manager (app_worker_t * app,
     544             :                                        session_t * listener)
     545             : {
     546             :   uword *smp;
     547         189 :   smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
     548         189 :   ALWAYS_ASSERT (smp != 0);
     549         189 :   return segment_manager_get (*smp);
     550             : }
     551             : 
     552             : session_t *
     553           8 : app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
     554             :                            u8 transport_proto)
     555             : {
     556             :   session_t *listener;
     557             :   u64 handle;
     558             :   u32 sm_index;
     559             :   u8 sst;
     560             : 
     561           8 :   sst = session_type_from_proto_and_ip (transport_proto,
     562             :                                         fib_proto == FIB_PROTOCOL_IP4);
     563             : 
     564             :   /* *INDENT-OFF* */
     565         353 :    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
     566             :      listener = listen_session_get_from_handle (handle);
     567             :      if (listener->session_type == sst
     568             :          && !(listener->flags & SESSION_F_PROXY))
     569             :        return listener;
     570             :    }));
     571             :   /* *INDENT-ON* */
     572             : 
     573           2 :   return 0;
     574             : }
     575             : 
     576             : session_t *
     577           2 : app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
     578             :                            u8 transport_proto)
     579             : {
     580             :   session_t *listener;
     581             :   u64 handle;
     582             :   u32 sm_index;
     583             :   u8 sst;
     584             : 
     585           2 :   sst = session_type_from_proto_and_ip (transport_proto,
     586             :                                         fib_proto == FIB_PROTOCOL_IP4);
     587             : 
     588             :   /* *INDENT-OFF* */
     589          41 :    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
     590             :      listener = listen_session_get_from_handle (handle);
     591             :      if (listener->session_type == sst && (listener->flags & SESSION_F_PROXY))
     592             :        return listener;
     593             :    }));
     594             :   /* *INDENT-ON* */
     595             : 
     596           0 :   return 0;
     597             : }
     598             : 
     599             : /**
     600             :  * Send an API message to the external app, to map new segment
     601             :  */
     602             : int
     603          88 : app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
     604             : {
     605          88 :   application_t *app = application_get (app_wrk->app_index);
     606             : 
     607          88 :   return app->cb_fns.add_segment_callback (app_wrk->wrk_index,
     608             :                                            segment_handle);
     609             : }
     610             : 
     611             : int
     612           6 : app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
     613             : {
     614           6 :   application_t *app = application_get (app_wrk->app_index);
     615           6 :   return app->cb_fns.del_segment_callback (app_wrk->wrk_index,
     616             :                                            segment_handle);
     617             : }
     618             : 
     619             : static inline u8
     620      251906 : app_worker_application_is_builtin (app_worker_t * app_wrk)
     621             : {
     622      251906 :   return app_wrk->app_is_builtin;
     623             : }
     624             : 
     625             : static int
     626          40 : app_wrk_send_fd (app_worker_t *app_wrk, int fd)
     627             : {
     628          40 :   if (!appns_sapi_enabled ())
     629             :     {
     630             :       vl_api_registration_t *reg;
     631             :       clib_error_t *error;
     632             : 
     633             :       reg =
     634          31 :         vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
     635          31 :       if (!reg)
     636             :         {
     637           0 :           clib_warning ("no api registration for client: %u",
     638             :                         app_wrk->api_client_index);
     639           0 :           return -1;
     640             :         }
     641             : 
     642          31 :       if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
     643           0 :         return -1;
     644             : 
     645          31 :       error = vl_api_send_fd_msg (reg, &fd, 1);
     646          31 :       if (error)
     647             :         {
     648           0 :           clib_error_report (error);
     649           0 :           return -1;
     650             :         }
     651             : 
     652          31 :       return 0;
     653             :     }
     654             : 
     655           9 :   app_sapi_msg_t smsg = { 0 };
     656             :   app_namespace_t *app_ns;
     657             :   clib_error_t *error;
     658             :   application_t *app;
     659             :   clib_socket_t *cs;
     660             :   u32 cs_index;
     661             : 
     662           9 :   app = application_get (app_wrk->app_index);
     663           9 :   app_ns = app_namespace_get (app->ns_index);
     664           9 :   cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
     665           9 :   cs = appns_sapi_get_socket (app_ns, cs_index);
     666           9 :   if (PREDICT_FALSE (!cs))
     667           0 :     return -1;
     668             : 
     669             :   /* There's no payload for the message only the type */
     670           9 :   smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
     671           9 :   error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
     672           9 :   if (error)
     673             :     {
     674           0 :       clib_error_report (error);
     675           0 :       return -1;
     676             :     }
     677             : 
     678           9 :   return 0;
     679             : }
     680             : 
     681             : static int
     682      116667 : mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
     683             :                            svm_msg_q_msg_t *msg)
     684             : {
     685      116667 :   int rv, n_try = 0;
     686             : 
     687      116720 :   while (n_try < 75)
     688             :     {
     689      116720 :       rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
     690      116720 :       if (!rv)
     691      116667 :         return 0;
     692             :       /*
     693             :        * Break the loop if mq is full, usually this is because the
     694             :        * app has crashed or is hanging on somewhere.
     695             :        */
     696          53 :       if (rv != -1)
     697           0 :         break;
     698          53 :       n_try += 1;
     699          53 :       usleep (1);
     700             :     }
     701             : 
     702           0 :   return -1;
     703             : }
     704             : 
     705             : typedef union app_wrk_mq_rpc_args_
     706             : {
     707             :   struct
     708             :   {
     709             :     u32 thread_index;
     710             :     u32 app_wrk_index;
     711             :   };
     712             :   uword as_uword;
     713             : } app_wrk_mq_rpc_ags_t;
     714             : 
     715             : static int
     716           0 : app_wrk_handle_mq_postponed_msgs (void *arg)
     717             : {
     718           0 :   svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
     719             :   app_wrk_postponed_msg_t *pm;
     720             :   app_wrk_mq_rpc_ags_t args;
     721           0 :   u32 max_msg, n_msg = 0;
     722             :   app_worker_t *app_wrk;
     723             :   session_event_t *evt;
     724             :   svm_msg_q_t *mq;
     725             : 
     726           0 :   args.as_uword = pointer_to_uword (arg);
     727           0 :   app_wrk = app_worker_get_if_valid (args.app_wrk_index);
     728           0 :   if (!app_wrk)
     729           0 :     return 0;
     730             : 
     731           0 :   mq = app_wrk->event_queue;
     732             : 
     733           0 :   clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
     734             : 
     735           0 :   max_msg = clib_min (32, clib_fifo_elts (app_wrk->postponed_mq_msgs));
     736             : 
     737           0 :   while (n_msg < max_msg)
     738             :     {
     739           0 :       pm = clib_fifo_head (app_wrk->postponed_mq_msgs);
     740           0 :       if (mq_try_lock_and_alloc_msg (mq, pm->ring, mq_msg))
     741           0 :         break;
     742             : 
     743           0 :       evt = svm_msg_q_msg_data (mq, mq_msg);
     744           0 :       clib_memset (evt, 0, sizeof (*evt));
     745           0 :       evt->event_type = pm->event_type;
     746           0 :       clib_memcpy_fast (evt->data, pm->data, pm->len);
     747             : 
     748           0 :       if (pm->fd != -1)
     749           0 :         app_wrk_send_fd (app_wrk, pm->fd);
     750             : 
     751           0 :       svm_msg_q_add_and_unlock (mq, mq_msg);
     752             : 
     753           0 :       clib_fifo_advance_head (app_wrk->postponed_mq_msgs, 1);
     754           0 :       n_msg += 1;
     755             :     }
     756             : 
     757           0 :   if (!clib_fifo_elts (app_wrk->postponed_mq_msgs))
     758             :     {
     759           0 :       app_wrk->mq_congested = 0;
     760             :     }
     761             :   else
     762             :     {
     763           0 :       session_send_rpc_evt_to_thread_force (
     764             :         args.thread_index, app_wrk_handle_mq_postponed_msgs,
     765           0 :         uword_to_pointer (args.as_uword, void *));
     766             :     }
     767             : 
     768           0 :   clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
     769             : 
     770           0 :   return 0;
     771             : }
     772             : 
     773             : static void
     774           0 : app_wrk_add_mq_postponed_msg (app_worker_t *app_wrk, session_mq_rings_e ring,
     775             :                               u8 evt_type, void *msg, u32 msg_len, int fd)
     776             : {
     777             :   app_wrk_postponed_msg_t *pm;
     778             : 
     779           0 :   clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
     780             : 
     781           0 :   app_wrk->mq_congested = 1;
     782             : 
     783           0 :   clib_fifo_add2 (app_wrk->postponed_mq_msgs, pm);
     784           0 :   clib_memcpy_fast (pm->data, msg, msg_len);
     785           0 :   pm->event_type = evt_type;
     786           0 :   pm->ring = ring;
     787           0 :   pm->len = msg_len;
     788           0 :   pm->fd = fd;
     789             : 
     790           0 :   if (clib_fifo_elts (app_wrk->postponed_mq_msgs) == 1)
     791             :     {
     792           0 :       app_wrk_mq_rpc_ags_t args = { .thread_index = vlib_get_thread_index (),
     793           0 :                                     .app_wrk_index = app_wrk->wrk_index };
     794             : 
     795           0 :       session_send_rpc_evt_to_thread_force (
     796             :         args.thread_index, app_wrk_handle_mq_postponed_msgs,
     797           0 :         uword_to_pointer (args.as_uword, void *));
     798             :     }
     799             : 
     800           0 :   clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
     801           0 : }
     802             : 
     803             : always_inline void
     804         290 : app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
     805             :                               u32 msg_len, int fd)
     806             : {
     807         290 :   svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
     808         290 :   svm_msg_q_t *mq = app_wrk->event_queue;
     809             :   session_event_t *evt;
     810             :   int rv;
     811             : 
     812         290 :   if (PREDICT_FALSE (app_wrk->mq_congested))
     813           0 :     goto handle_congestion;
     814             : 
     815         290 :   rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_CTRL_EVT_RING, mq_msg);
     816         290 :   if (PREDICT_FALSE (rv))
     817           0 :     goto handle_congestion;
     818             : 
     819         290 :   evt = svm_msg_q_msg_data (mq, mq_msg);
     820         290 :   clib_memset (evt, 0, sizeof (*evt));
     821         290 :   evt->event_type = evt_type;
     822         290 :   clib_memcpy_fast (evt->data, msg, msg_len);
     823             : 
     824         290 :   if (fd != -1)
     825          40 :     app_wrk_send_fd (app_wrk, fd);
     826             : 
     827         290 :   svm_msg_q_add_and_unlock (mq, mq_msg);
     828             : 
     829         290 :   return;
     830             : 
     831           0 : handle_congestion:
     832             : 
     833           0 :   app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_CTRL_EVT_RING, evt_type,
     834             :                                 msg, msg_len, fd);
     835             : }
     836             : 
     837             : void
     838          40 : app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
     839             :                           u32 msg_len, int fd)
     840             : {
     841          40 :   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
     842          40 : }
     843             : 
     844             : void
     845         250 : app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
     846             :                        u32 msg_len)
     847             : {
     848         250 :   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
     849         250 : }
     850             : 
     851             : static inline int
     852      229503 : app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s)
     853             : {
     854      229503 :   svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
     855             :   session_event_t *evt;
     856             :   svm_msg_q_t *mq;
     857             :   u32 app_session;
     858             :   int rv;
     859             : 
     860      229503 :   if (app_worker_application_is_builtin (app_wrk))
     861       69114 :     return app_worker_builtin_rx (app_wrk, s);
     862             : 
     863      160389 :   if (svm_fifo_has_event (s->rx_fifo))
     864       66281 :     return 0;
     865             : 
     866       94108 :   app_session = s->rx_fifo->shr->client_session_index;
     867       94108 :   mq = app_wrk->event_queue;
     868             : 
     869       94108 :   if (PREDICT_FALSE (app_wrk->mq_congested))
     870           0 :     goto handle_congestion;
     871             : 
     872       94108 :   rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
     873             : 
     874       94108 :   if (PREDICT_FALSE (rv))
     875           0 :     goto handle_congestion;
     876             : 
     877       94108 :   evt = svm_msg_q_msg_data (mq, mq_msg);
     878       94108 :   evt->event_type = SESSION_IO_EVT_RX;
     879       94108 :   evt->session_index = app_session;
     880             : 
     881       94108 :   (void) svm_fifo_set_event (s->rx_fifo);
     882             : 
     883       94108 :   svm_msg_q_add_and_unlock (mq, mq_msg);
     884             : 
     885       94108 :   return 0;
     886             : 
     887           0 : handle_congestion:
     888             : 
     889           0 :   app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
     890             :                                 SESSION_IO_EVT_RX, &app_session,
     891             :                                 sizeof (app_session), -1);
     892           0 :   return -1;
     893             : }
     894             : 
     895             : static inline int
     896       22403 : app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s)
     897             : {
     898       22403 :   svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
     899             :   session_event_t *evt;
     900             :   svm_msg_q_t *mq;
     901             :   u32 app_session;
     902             :   int rv;
     903             : 
     904       22403 :   if (app_worker_application_is_builtin (app_wrk))
     905         134 :     return app_worker_builtin_tx (app_wrk, s);
     906             : 
     907       22269 :   app_session = s->tx_fifo->shr->client_session_index;
     908       22269 :   mq = app_wrk->event_queue;
     909             : 
     910       22269 :   if (PREDICT_FALSE (app_wrk->mq_congested))
     911           0 :     goto handle_congestion;
     912             : 
     913       22269 :   rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
     914             : 
     915       22269 :   if (PREDICT_FALSE (rv))
     916           0 :     goto handle_congestion;
     917             : 
     918       22269 :   evt = svm_msg_q_msg_data (mq, mq_msg);
     919       22269 :   evt->event_type = SESSION_IO_EVT_TX;
     920       22269 :   evt->session_index = app_session;
     921             : 
     922       22269 :   svm_msg_q_add_and_unlock (mq, mq_msg);
     923             : 
     924       22269 :   return 0;
     925             : 
     926           0 : handle_congestion:
     927             : 
     928           0 :   app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
     929             :                                 SESSION_IO_EVT_TX, &app_session,
     930             :                                 sizeof (app_session), -1);
     931           0 :   return -1;
     932             : }
     933             : 
     934             : /* *INDENT-OFF* */
     935             : typedef int (app_send_evt_handler_fn) (app_worker_t *app,
     936             :                                        session_t *s);
     937             : static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
     938             :     app_send_io_evt_rx,
     939             :     app_send_io_evt_tx,
     940             : };
     941             : /* *INDENT-ON* */
     942             : 
     943             : /**
     944             :  * Send event to application
     945             :  *
     946             :  * Logic from queue perspective is blocking. However, if queue is full,
     947             :  * we return.
     948             :  */
     949             : int
     950      251906 : app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
     951             :                                 u8 evt_type)
     952             : {
     953      251906 :   return app_send_evt_handler_fns[evt_type] (app, s);
     954             : }
     955             : 
     956             : u8 *
     957           5 : format_app_worker_listener (u8 * s, va_list * args)
     958             : {
     959           5 :   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
     960           5 :   u64 handle = va_arg (*args, u64);
     961           5 :   u32 sm_index = va_arg (*args, u32);
     962           5 :   int verbose = va_arg (*args, int);
     963             :   session_t *listener;
     964             :   const u8 *app_name;
     965             :   u8 *str;
     966             : 
     967           5 :   if (!app_wrk)
     968             :     {
     969           5 :       if (verbose)
     970           0 :         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s%-15s%-15s%-10s",
     971             :                     "Connection", "App", "Wrk", "API Client", "ListenerID",
     972             :                     "SegManager");
     973             :       else
     974           5 :         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s", "Connection",
     975             :                     "App", "Wrk");
     976             : 
     977           5 :       return s;
     978             :     }
     979             : 
     980           0 :   app_name = application_name_from_index (app_wrk->app_index);
     981           0 :   listener = listen_session_get_from_handle (handle);
     982           0 :   str = format (0, "%U", format_session, listener, verbose);
     983             : 
     984           0 :   if (verbose)
     985             :     {
     986             :       u8 *buf;
     987           0 :       buf = format (0, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
     988           0 :       s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%-10v%-15u%-15u%-10u", str,
     989             :                   app_name, buf, app_wrk->api_client_index, handle, sm_index);
     990           0 :       vec_free (buf);
     991             :     }
     992             :   else
     993           0 :     s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%=10u", str, app_name,
     994             :                 app_wrk->wrk_map_index);
     995             : 
     996           0 :   vec_free (str);
     997             : 
     998           0 :   return s;
     999             : }
    1000             : 
    1001             : u8 *
    1002           0 : format_app_worker (u8 * s, va_list * args)
    1003             : {
    1004           0 :   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
    1005           0 :   u32 indent = 1;
    1006             : 
    1007           0 :   s = format (s,
    1008             :               "%U wrk-index %u app-index %u map-index %u "
    1009             :               "api-client-index %d mq-cong %u\n",
    1010             :               format_white_space, indent, app_wrk->wrk_index,
    1011             :               app_wrk->app_index, app_wrk->wrk_map_index,
    1012           0 :               app_wrk->api_client_index, app_wrk->mq_congested);
    1013           0 :   return s;
    1014             : }
    1015             : 
    1016             : void
    1017           0 : app_worker_format_connects (app_worker_t * app_wrk, int verbose)
    1018             : {
    1019             :   segment_manager_t *sm;
    1020             : 
    1021             :   /* Header */
    1022           0 :   if (!app_wrk)
    1023             :     {
    1024           0 :       segment_manager_format_sessions (0, verbose);
    1025           0 :       return;
    1026             :     }
    1027             : 
    1028           0 :   if (app_wrk->connects_seg_manager == (u32) ~ 0)
    1029           0 :     return;
    1030             : 
    1031           0 :   sm = segment_manager_get (app_wrk->connects_seg_manager);
    1032           0 :   segment_manager_format_sessions (sm, verbose);
    1033             : }
    1034             : 
    1035             : /*
    1036             :  * fd.io coding-style-patch-verification: ON
    1037             :  *
    1038             :  * Local Variables:
    1039             :  * eval: (c-set-style "gnu")
    1040             :  * End:
    1041             :  */

Generated by: LCOV version 1.14