LCOV - code coverage report
Current view: top level - vnet/session - application_local.c (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 497 645 77.1 %
Date: 2023-10-26 01:39:38 Functions: 41 48 85.4 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2019 Cisco and/or its affiliates.
       3             :  * Licensed under the Apache License, Version 2.0 (the "License");
       4             :  * you may not use this file except in compliance with the License.
       5             :  * You may obtain a copy of the License at:
       6             :  *
       7             :  *     http://www.apache.org/licenses/LICENSE-2.0
       8             :  *
       9             :  * Unless required by applicable law or agreed to in writing, software
      10             :  * distributed under the License is distributed on an "AS IS" BASIS,
      11             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             :  * See the License for the specific language governing permissions and
      13             :  * limitations under the License.
      14             :  */
      15             : 
      16             : #include <vnet/session/application_local.h>
      17             : #include <vnet/session/session.h>
      18             : 
      19             : typedef enum ct_segment_flags_
      20             : {
      21             :   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
      22             :   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
      23             : } ct_segment_flags_t;
      24             : 
      25             : typedef struct ct_segment_
      26             : {
      27             :   u32 client_n_sessions;
      28             :   u32 server_n_sessions;
      29             :   u32 seg_ctx_index;
      30             :   u32 ct_seg_index;
      31             :   u32 segment_index;
      32             :   ct_segment_flags_t flags;
      33             : } ct_segment_t;
      34             : 
      35             : typedef struct ct_segments_
      36             : {
      37             :   u32 sm_index;
      38             :   u32 server_wrk;
      39             :   u32 client_wrk;
      40             :   u32 fifo_pair_bytes;
      41             :   ct_segment_t *segments;
      42             : } ct_segments_ctx_t;
      43             : 
      44             : typedef struct ct_cleanup_req_
      45             : {
      46             :   u32 ct_index;
      47             : } ct_cleanup_req_t;
      48             : 
      49             : typedef struct ct_worker_
      50             : {
      51             :   ct_connection_t *connections;       /**< Per-worker connection pools */
      52             :   u32 *pending_connects;              /**< Fifo of pending ho indices */
      53             :   ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
      54             :   u8 have_connects;                   /**< Set if connect rpc pending */
      55             :   u8 have_cleanups;                   /**< Set if cleanup rpc pending */
      56             :   clib_spinlock_t pending_connects_lock; /**< Lock for pending connects */
      57             :   u32 *new_connects;                     /**< Burst of connects to be done */
      58             : } ct_worker_t;
      59             : 
      60             : typedef struct ct_main_
      61             : {
      62             :   ct_worker_t *wrk;                     /**< Per-worker state */
      63             :   u32 n_workers;                        /**< Number of vpp workers */
      64             :   u32 n_sessions;                       /**< Cumulative sessions counter */
      65             :   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
      66             :   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
      67             :   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
      68             :   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
      69             :   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
      70             :   u32 **fwrk_pending_connects;          /**< First wrk pending half-opens */
      71             :   u32 fwrk_thread;                      /**< First worker thread */
      72             :   u8 fwrk_have_flush;                   /**< Flag for connect flush rpc */
      73             : } ct_main_t;
      74             : 
      75             : static ct_main_t ct_main;
      76             : 
      77             : static inline ct_worker_t *
      78      717065 : ct_worker_get (u32 thread_index)
      79             : {
      80      717065 :   return &ct_main.wrk[thread_index];
      81             : }
      82             : 
      83             : static ct_connection_t *
      84          57 : ct_connection_alloc (u32 thread_index)
      85             : {
      86          57 :   ct_worker_t *wrk = ct_worker_get (thread_index);
      87             :   ct_connection_t *ct;
      88             : 
      89          57 :   pool_get_aligned_safe (wrk->connections, ct, CLIB_CACHE_LINE_BYTES);
      90          57 :   clib_memset (ct, 0, sizeof (*ct));
      91          57 :   ct->c_c_index = ct - wrk->connections;
      92          57 :   ct->c_thread_index = thread_index;
      93          57 :   ct->client_wrk = ~0;
      94          57 :   ct->server_wrk = ~0;
      95          57 :   ct->seg_ctx_index = ~0;
      96          57 :   ct->ct_seg_index = ~0;
      97          57 :   return ct;
      98             : }
      99             : 
     100             : static ct_connection_t *
     101      716893 : ct_connection_get (u32 ct_index, u32 thread_index)
     102             : {
     103      716893 :   ct_worker_t *wrk = ct_worker_get (thread_index);
     104             : 
     105      716893 :   if (pool_is_free_index (wrk->connections, ct_index))
     106          30 :     return 0;
     107      716863 :   return pool_elt_at_index (wrk->connections, ct_index);
     108             : }
     109             : 
     110             : static void
     111          44 : ct_connection_free (ct_connection_t * ct)
     112             : {
     113          44 :   ct_worker_t *wrk = ct_worker_get (ct->c_thread_index);
     114             : 
     115             :   if (CLIB_DEBUG)
     116             :     {
     117          44 :       clib_memset (ct, 0xfc, sizeof (*ct));
     118          44 :       pool_put (wrk->connections, ct);
     119          44 :       return;
     120             :     }
     121             :   pool_put (wrk->connections, ct);
     122             : }
     123             : 
     124             : static ct_connection_t *
     125          13 : ct_half_open_alloc (void)
     126             : {
     127          13 :   ct_main_t *cm = &ct_main;
     128             :   u32 *hip;
     129             : 
     130          13 :   clib_spinlock_lock (&cm->ho_reuseable_lock);
     131          21 :   vec_foreach (hip, cm->ho_reusable)
     132           8 :     pool_put_index (cm->wrk[cm->fwrk_thread].connections, *hip);
     133          13 :   vec_reset_length (cm->ho_reusable);
     134          13 :   clib_spinlock_unlock (&cm->ho_reuseable_lock);
     135             : 
     136          13 :   return ct_connection_alloc (cm->fwrk_thread);
     137             : }
     138             : 
     139             : static ct_connection_t *
     140          26 : ct_half_open_get (u32 ho_index)
     141             : {
     142          26 :   ct_main_t *cm = &ct_main;
     143          26 :   return ct_connection_get (ho_index, cm->fwrk_thread);
     144             : }
     145             : 
     146             : void
     147          13 : ct_half_open_add_reusable (u32 ho_index)
     148             : {
     149          13 :   ct_main_t *cm = &ct_main;
     150             : 
     151          13 :   clib_spinlock_lock (&cm->ho_reuseable_lock);
     152          13 :   vec_add1 (cm->ho_reusable, ho_index);
     153          13 :   clib_spinlock_unlock (&cm->ho_reuseable_lock);
     154          13 : }
     155             : 
     156             : session_t *
     157          12 : ct_session_get_peer (session_t * s)
     158             : {
     159             :   ct_connection_t *ct, *peer_ct;
     160          12 :   ct = ct_connection_get (s->connection_index, s->thread_index);
     161          12 :   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
     162          12 :   return session_get (peer_ct->c_s_index, s->thread_index);
     163             : }
     164             : 
     165             : void
     166          18 : ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
     167             : {
     168             :   ct_connection_t *ct;
     169          18 :   ct = (ct_connection_t *) session_get_transport (ll);
     170          18 :   sep->transport_proto = ct->actual_tp;
     171          18 :   sep->port = ct->c_lcl_port;
     172          18 :   sep->is_ip4 = ct->c_is_ip4;
     173          18 :   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
     174          18 : }
     175             : 
     176             : static void
     177           4 : ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
     178             : {
     179             :   ct_connection_t *peer_ct;
     180             : 
     181           4 :   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
     182             : 
     183           4 :   if (is_client)
     184             :     {
     185           4 :       ct->client_wrk = APP_INVALID_INDEX;
     186           4 :       if (peer_ct)
     187           0 :         ct->client_wrk = APP_INVALID_INDEX;
     188             :     }
     189             :   else
     190             :     {
     191           0 :       ct->server_wrk = APP_INVALID_INDEX;
     192           0 :       if (peer_ct)
     193           0 :         ct->server_wrk = APP_INVALID_INDEX;
     194             :     }
     195           4 : }
     196             : 
     197             : static void
     198          24 : ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
     199             :                           svm_fifo_t *tx_fifo)
     200             : {
     201             :   ct_segments_ctx_t *seg_ctx;
     202          24 :   ct_main_t *cm = &ct_main;
     203             :   segment_manager_t *sm;
     204             :   app_worker_t *app_wrk;
     205             :   ct_segment_t *ct_seg;
     206             :   fifo_segment_t *fs;
     207             :   u32 seg_index;
     208             :   session_t *s;
     209             :   int cnt;
     210             : 
     211             :   /*
     212             :    * Cleanup fifos
     213             :    */
     214             : 
     215          24 :   sm = segment_manager_get (rx_fifo->segment_manager);
     216          24 :   seg_index = rx_fifo->segment_index;
     217             : 
     218          24 :   fs = segment_manager_get_segment_w_lock (sm, seg_index);
     219          24 :   fifo_segment_free_fifo (fs, rx_fifo);
     220          24 :   fifo_segment_free_fifo (fs, tx_fifo);
     221          24 :   segment_manager_segment_reader_unlock (sm);
     222             : 
     223             :   /*
     224             :    * Atomically update segment context with readers lock
     225             :    */
     226             : 
     227          24 :   clib_rwlock_reader_lock (&cm->app_segs_lock);
     228             : 
     229          24 :   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     230          24 :   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
     231             : 
     232          24 :   if (ct->flags & CT_CONN_F_CLIENT)
     233             :     {
     234          12 :       cnt =
     235          12 :         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
     236             :     }
     237             :   else
     238             :     {
     239          12 :       cnt =
     240          12 :         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
     241             :     }
     242             : 
     243          24 :   clib_rwlock_reader_unlock (&cm->app_segs_lock);
     244             : 
     245             :   /*
     246             :    * No need to do any app updates, return
     247             :    */
     248          24 :   ASSERT (cnt >= 0);
     249          24 :   if (cnt)
     250           8 :     return;
     251             : 
     252             :   /*
     253             :    * Grab exclusive lock and update flags unless some other thread
     254             :    * added more sessions
     255             :    */
     256          16 :   clib_rwlock_writer_lock (&cm->app_segs_lock);
     257             : 
     258          16 :   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     259          16 :   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
     260          16 :   if (ct->flags & CT_CONN_F_CLIENT)
     261             :     {
     262           8 :       cnt = ct_seg->client_n_sessions;
     263           8 :       if (cnt)
     264           0 :         goto done;
     265           8 :       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
     266           8 :       s = session_get (ct->c_s_index, ct->c_thread_index);
     267           8 :       if (s->app_wrk_index == APP_INVALID_INDEX)
     268           4 :         ct_set_invalid_app_wrk (ct, 1 /* is_client */);
     269             :     }
     270             :   else
     271             :     {
     272           8 :       cnt = ct_seg->server_n_sessions;
     273           8 :       if (cnt)
     274           0 :         goto done;
     275           8 :       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
     276           8 :       s = session_get (ct->c_s_index, ct->c_thread_index);
     277           8 :       if (s->app_wrk_index == APP_INVALID_INDEX)
     278           0 :         ct_set_invalid_app_wrk (ct, 0 /* is_client */);
     279             :     }
     280             : 
     281          16 :   if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
     282          12 :       !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
     283           8 :     goto done;
     284             : 
     285             :   /*
     286             :    * Remove segment context because both client and server detached
     287             :    */
     288             : 
     289           8 :   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
     290             : 
     291             :   /*
     292             :    * No more segment indices left, remove the segments context
     293             :    */
     294           8 :   if (!pool_elts (seg_ctx->segments))
     295             :     {
     296           8 :       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
     297           8 :       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
     298           8 :       hash_unset (cm->app_segs_ctxs_table, table_handle);
     299           8 :       pool_free (seg_ctx->segments);
     300           8 :       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     301             :     }
     302             : 
     303             :   /*
     304             :    * Segment to be removed so notify both apps
     305             :    */
     306             : 
     307           8 :   app_wrk = app_worker_get_if_valid (ct->client_wrk);
     308             :   /* Determine if client app still needs notification, i.e., if it is
     309             :    * still attached. If client detached and this is the last ct session
     310             :    * on this segment, then its connects segment manager should also be
     311             :    * detached, so do not send notification */
     312           8 :   if (app_wrk)
     313             :     {
     314             :       segment_manager_t *csm;
     315           4 :       csm = app_worker_get_connect_segment_manager (app_wrk);
     316           4 :       if (!segment_manager_app_detached (csm))
     317           4 :         app_worker_del_segment_notify (app_wrk, ct->segment_handle);
     318             :     }
     319             : 
     320             :   /* Notify server app and free segment */
     321           8 :   segment_manager_lock_and_del_segment (sm, seg_index);
     322             : 
     323             :   /* Cleanup segment manager if needed. If server detaches there's a chance
     324             :    * the client's sessions will hold up segment removal */
     325           8 :   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
     326           6 :     segment_manager_free_safe (sm);
     327             : 
     328           2 : done:
     329             : 
     330          16 :   clib_rwlock_writer_unlock (&cm->app_segs_lock);
     331             : }
     332             : 
     333             : static void
     334           0 : ct_session_force_disconnect_server (ct_connection_t *sct)
     335             : {
     336           0 :   sct->peer_index = ~0;
     337           0 :   session_transport_closing_notify (&sct->connection);
     338           0 : }
     339             : 
     340             : int
     341          12 : ct_session_connect_notify (session_t *ss, session_error_t err)
     342             : {
     343             :   u32 ss_index, opaque, thread_index;
     344             :   ct_connection_t *sct, *cct;
     345             :   app_worker_t *client_wrk;
     346             :   session_t *cs;
     347             : 
     348          12 :   ss_index = ss->session_index;
     349          12 :   thread_index = ss->thread_index;
     350          12 :   sct = (ct_connection_t *) session_get_transport (ss);
     351          12 :   client_wrk = app_worker_get (sct->client_wrk);
     352          12 :   opaque = sct->client_opaque;
     353             : 
     354          12 :   cct = ct_connection_get (sct->peer_index, thread_index);
     355             : 
     356             :   /* Client closed while waiting for reply from server */
     357          12 :   if (PREDICT_FALSE (!cct))
     358             :     {
     359           0 :       ct_session_force_disconnect_server (sct);
     360           0 :       return 0;
     361             :     }
     362             : 
     363          12 :   session_half_open_delete_notify (&cct->connection);
     364          12 :   cct->flags &= ~CT_CONN_F_HALF_OPEN;
     365             : 
     366          12 :   if (PREDICT_FALSE (err))
     367           0 :     goto connect_error;
     368             : 
     369             :   /*
     370             :    * Alloc client session, server session assumed to be established
     371             :    */
     372             : 
     373          12 :   ASSERT (ss->session_state >= SESSION_STATE_READY);
     374             : 
     375          12 :   cs = session_alloc (thread_index);
     376          12 :   ss = session_get (ss_index, thread_index);
     377          12 :   cs->session_type = ss->session_type;
     378          12 :   cs->listener_handle = SESSION_INVALID_HANDLE;
     379          12 :   session_set_state (cs, SESSION_STATE_CONNECTING);
     380          12 :   cs->app_wrk_index = client_wrk->wrk_index;
     381          12 :   cs->connection_index = cct->c_c_index;
     382          12 :   cs->opaque = opaque;
     383          12 :   cct->c_s_index = cs->session_index;
     384             : 
     385             :   /* This will allocate fifos for the session. They won't be used for
     386             :    * exchanging data but they will be used to close the connection if
     387             :    * the segment manager/worker is freed */
     388          12 :   if ((err = app_worker_init_connected (client_wrk, cs)))
     389             :     {
     390           0 :       session_free (cs);
     391           0 :       ct_session_force_disconnect_server (sct);
     392           0 :       err = SESSION_E_ALLOC;
     393           0 :       goto connect_error;
     394             :     }
     395             : 
     396          12 :   session_set_state (cs, SESSION_STATE_CONNECTING);
     397             : 
     398          12 :   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
     399             :     {
     400           0 :       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
     401           0 :       session_free (cs);
     402           0 :       ct_session_force_disconnect_server (sct);
     403           0 :       goto cleanup_client;
     404             :     }
     405             : 
     406          12 :   cs = session_get (cct->c_s_index, cct->c_thread_index);
     407          12 :   session_set_state (cs, SESSION_STATE_READY);
     408             : 
     409          12 :   return 0;
     410             : 
     411           0 : connect_error:
     412             : 
     413           0 :   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
     414             : 
     415           0 : cleanup_client:
     416             : 
     417           0 :   if (cct->client_rx_fifo)
     418           0 :     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
     419           0 :   ct_connection_free (cct);
     420           0 :   return -1;
     421             : }
     422             : 
     423             : static inline ct_segment_t *
     424           4 : ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
     425             :                         u32 seg_ctx_index)
     426             : {
     427             :   uword free_bytes, max_free_bytes;
     428           4 :   ct_segment_t *ct_seg, *res = 0;
     429             :   ct_segments_ctx_t *seg_ctx;
     430             :   fifo_segment_t *fs;
     431             :   u32 max_fifos;
     432             : 
     433           4 :   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
     434           4 :   max_free_bytes = seg_ctx->fifo_pair_bytes;
     435             : 
     436           8 :   pool_foreach (ct_seg, seg_ctx->segments)
     437             :     {
     438             :       /* Client or server has detached so segment cannot be used */
     439           4 :       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
     440           4 :       free_bytes = fifo_segment_available_bytes (fs);
     441           4 :       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
     442           4 :       if (free_bytes > max_free_bytes &&
     443           4 :           fifo_segment_num_fifos (fs) / 2 < max_fifos)
     444             :         {
     445           4 :           max_free_bytes = free_bytes;
     446           4 :           res = ct_seg;
     447             :         }
     448             :     }
     449             : 
     450           4 :   return res;
     451             : }
     452             : 
     453             : static ct_segment_t *
     454           9 : ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
     455             :                   segment_manager_t *sm, u32 client_wrk_index)
     456             : {
     457           9 :   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
     458             :   segment_manager_props_t *props;
     459           9 :   const u32 margin = 16 << 10;
     460             :   ct_segments_ctx_t *seg_ctx;
     461             :   app_worker_t *client_wrk;
     462             :   u64 seg_size, seg_handle;
     463             :   application_t *server;
     464             :   ct_segment_t *ct_seg;
     465             :   uword *spp;
     466             :   int fs_index;
     467             : 
     468           9 :   server = application_get (server_wrk->app_index);
     469           9 :   props = application_segment_manager_properties (server);
     470           9 :   sm_index = segment_manager_index (sm);
     471           9 :   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
     472             : 
     473             :   /*
     474             :    * Make sure another thread did not alloc a segment while acquiring the lock
     475             :    */
     476             : 
     477           9 :   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
     478           9 :   if (spp)
     479             :     {
     480           0 :       seg_ctx_index = *spp;
     481           0 :       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
     482           0 :       if (ct_seg)
     483           0 :         return ct_seg;
     484             :     }
     485             : 
     486             :   /*
     487             :    * No segment, try to alloc one and notify the server and the client.
     488             :    * Make sure the segment is not used for other fifos
     489             :    */
     490           9 :   seg_size = clib_max (props->segment_size, 128 << 20);
     491             :   fs_index =
     492           9 :     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
     493           9 :   if (fs_index < 0)
     494           0 :     return 0;
     495             : 
     496           9 :   if (seg_ctx_index == ~0)
     497             :     {
     498           9 :       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
     499           9 :       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
     500           9 :       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
     501           9 :       seg_ctx->server_wrk = server_wrk->wrk_index;
     502           9 :       seg_ctx->client_wrk = client_wrk_index;
     503           9 :       seg_ctx->sm_index = sm_index;
     504           9 :       seg_ctx->fifo_pair_bytes = pair_bytes;
     505             :     }
     506             :   else
     507             :     {
     508           0 :       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
     509             :     }
     510             : 
     511           9 :   pool_get_zero (seg_ctx->segments, ct_seg);
     512           9 :   ct_seg->segment_index = fs_index;
     513           9 :   ct_seg->server_n_sessions = 0;
     514           9 :   ct_seg->client_n_sessions = 0;
     515           9 :   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
     516           9 :   ct_seg->seg_ctx_index = seg_ctx_index;
     517             : 
     518             :   /* New segment, notify the server and client */
     519           9 :   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
     520           9 :   if (app_worker_add_segment_notify (server_wrk, seg_handle))
     521           0 :     goto error;
     522             : 
     523           9 :   client_wrk = app_worker_get (client_wrk_index);
     524           9 :   if (app_worker_add_segment_notify (client_wrk, seg_handle))
     525             :     {
     526           0 :       app_worker_del_segment_notify (server_wrk, seg_handle);
     527           0 :       goto error;
     528             :     }
     529             : 
     530           9 :   return ct_seg;
     531             : 
     532           0 : error:
     533             : 
     534           0 :   segment_manager_lock_and_del_segment (sm, fs_index);
     535           0 :   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
     536           0 :   return 0;
     537             : }
     538             : 
     539             : static int
     540          13 : ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
     541             :                           session_t *ls, session_t *ll)
     542             : {
     543             :   segment_manager_props_t *props;
     544             :   u64 seg_handle, table_handle;
     545          13 :   u32 sm_index, fs_index = ~0;
     546             :   ct_segments_ctx_t *seg_ctx;
     547          13 :   ct_main_t *cm = &ct_main;
     548             :   application_t *server;
     549             :   segment_manager_t *sm;
     550             :   ct_segment_t *ct_seg;
     551             :   fifo_segment_t *fs;
     552             :   uword *spp;
     553             :   int rv;
     554             : 
     555          13 :   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
     556          13 :   sm_index = segment_manager_index (sm);
     557          13 :   server = application_get (server_wrk->app_index);
     558          13 :   props = application_segment_manager_properties (server);
     559             : 
     560          13 :   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
     561          13 :   table_handle = (u64) sm_index << 32 | table_handle;
     562             : 
     563             :   /*
     564             :    * Check if we already have a segment that can hold the fifos
     565             :    */
     566             : 
     567          13 :   clib_rwlock_reader_lock (&cm->app_segs_lock);
     568             : 
     569          13 :   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
     570          13 :   if (spp)
     571             :     {
     572           4 :       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
     573           4 :       if (ct_seg)
     574             :         {
     575           4 :           ct->seg_ctx_index = ct_seg->seg_ctx_index;
     576           4 :           ct->ct_seg_index = ct_seg->ct_seg_index;
     577           4 :           fs_index = ct_seg->segment_index;
     578           4 :           ct_seg->flags &=
     579             :             ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
     580           4 :           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
     581           4 :           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
     582             :         }
     583             :     }
     584             : 
     585          13 :   clib_rwlock_reader_unlock (&cm->app_segs_lock);
     586             : 
     587             :   /*
     588             :    * If not, grab exclusive lock and allocate segment
     589             :    */
     590          13 :   if (fs_index == ~0)
     591             :     {
     592           9 :       clib_rwlock_writer_lock (&cm->app_segs_lock);
     593             : 
     594             :       ct_seg =
     595           9 :         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
     596           9 :       if (!ct_seg)
     597             :         {
     598           0 :           clib_rwlock_writer_unlock (&cm->app_segs_lock);
     599           0 :           return -1;
     600             :         }
     601             : 
     602           9 :       ct->seg_ctx_index = ct_seg->seg_ctx_index;
     603           9 :       ct->ct_seg_index = ct_seg->ct_seg_index;
     604           9 :       ct_seg->server_n_sessions += 1;
     605           9 :       ct_seg->client_n_sessions += 1;
     606           9 :       fs_index = ct_seg->segment_index;
     607             : 
     608           9 :       clib_rwlock_writer_unlock (&cm->app_segs_lock);
     609             :     }
     610             : 
     611             :   /*
     612             :    * Allocate and initialize the fifos
     613             :    */
     614          13 :   fs = segment_manager_get_segment_w_lock (sm, fs_index);
     615          13 :   rv = segment_manager_try_alloc_fifos (
     616          13 :     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
     617             :     &ls->rx_fifo, &ls->tx_fifo);
     618          13 :   if (rv)
     619             :     {
     620           0 :       segment_manager_segment_reader_unlock (sm);
     621             : 
     622           0 :       clib_rwlock_reader_lock (&cm->app_segs_lock);
     623             : 
     624           0 :       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     625           0 :       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
     626           0 :       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
     627           0 :       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
     628             : 
     629           0 :       clib_rwlock_reader_unlock (&cm->app_segs_lock);
     630             : 
     631           0 :       return rv;
     632             :     }
     633             : 
     634          13 :   ls->rx_fifo->shr->master_session_index = ls->session_index;
     635          13 :   ls->tx_fifo->shr->master_session_index = ls->session_index;
     636          13 :   ls->rx_fifo->master_thread_index = ls->thread_index;
     637          13 :   ls->tx_fifo->master_thread_index = ls->thread_index;
     638             : 
     639          13 :   seg_handle = segment_manager_segment_handle (sm, fs);
     640          13 :   segment_manager_segment_reader_unlock (sm);
     641             : 
     642          13 :   ct->segment_handle = seg_handle;
     643             : 
     644          13 :   return 0;
     645             : }
     646             : 
     647             : static void
     648          13 : ct_accept_one (u32 thread_index, u32 ho_index)
     649             : {
     650             :   ct_connection_t *sct, *cct, *ho;
     651             :   transport_connection_t *ll_ct;
     652             :   app_worker_t *server_wrk;
     653             :   u32 cct_index, ll_index;
     654             :   session_t *ss, *ll;
     655             : 
     656             :   /*
     657             :    * Alloc client ct and initialize from ho
     658             :    */
     659          13 :   cct = ct_connection_alloc (thread_index);
     660          13 :   cct_index = cct->c_c_index;
     661             : 
     662          13 :   ho = ct_half_open_get (ho_index);
     663             : 
     664             :   /* Unlikely but half-open session and transport could have been freed */
     665          13 :   if (PREDICT_FALSE (!ho))
     666             :     {
     667           0 :       ct_connection_free (cct);
     668           0 :       return;
     669             :     }
     670             : 
     671          13 :   clib_memcpy (cct, ho, sizeof (*ho));
     672          13 :   cct->c_c_index = cct_index;
     673          13 :   cct->c_thread_index = thread_index;
     674          13 :   cct->flags |= CT_CONN_F_HALF_OPEN;
     675             : 
     676             :   /* Notify session layer that half-open is on a different thread
     677             :    * and mark ho connection index reusable. Avoids another rpc
     678             :    */
     679          13 :   session_half_open_migrate_notify (&cct->connection);
     680          13 :   session_half_open_migrated_notify (&cct->connection);
     681          13 :   ct_half_open_add_reusable (ho_index);
     682             : 
     683             :   /*
     684             :    * Alloc and init server transport
     685             :    */
     686             : 
     687          13 :   ll_index = cct->peer_index;
     688          13 :   ll = listen_session_get (ll_index);
     689          13 :   sct = ct_connection_alloc (thread_index);
     690             :   /* Transport not necessarily ct but it might, so grab after sct alloc */
     691          13 :   ll_ct = listen_session_get_transport (ll);
     692             : 
     693             :   /* Make sure cct is valid after sct alloc */
     694          13 :   cct = ct_connection_get (cct_index, thread_index);
     695             : 
     696          13 :   sct->c_rmt_port = 0;
     697          13 :   sct->c_lcl_port = ll_ct->lcl_port;
     698          13 :   sct->c_is_ip4 = cct->c_is_ip4;
     699          13 :   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
     700          13 :   sct->client_wrk = cct->client_wrk;
     701          13 :   sct->c_proto = TRANSPORT_PROTO_NONE;
     702          13 :   sct->client_opaque = cct->client_opaque;
     703          13 :   sct->actual_tp = cct->actual_tp;
     704             : 
     705          13 :   sct->peer_index = cct->c_c_index;
     706          13 :   cct->peer_index = sct->c_c_index;
     707             : 
     708             :   /*
     709             :    * Accept server session. Client session is created only after
     710             :    * server confirms accept.
     711             :    */
     712          13 :   ss = session_alloc (thread_index);
     713          13 :   ll = listen_session_get (ll_index);
     714          26 :   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
     715          13 :                                                      sct->c_is_ip4);
     716          13 :   ss->connection_index = sct->c_c_index;
     717          13 :   ss->listener_handle = listen_session_get_handle (ll);
     718          13 :   session_set_state (ss, SESSION_STATE_CREATED);
     719             : 
     720          13 :   server_wrk = application_listener_select_worker (ll);
     721          13 :   ss->app_wrk_index = server_wrk->wrk_index;
     722             : 
     723          13 :   sct->c_s_index = ss->session_index;
     724          13 :   sct->server_wrk = ss->app_wrk_index;
     725             : 
     726          13 :   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
     727             :     {
     728           0 :       ct_session_connect_notify (ss, SESSION_E_ALLOC);
     729           0 :       ct_connection_free (sct);
     730           0 :       session_free (ss);
     731           0 :       return;
     732             :     }
     733             : 
     734          13 :   cct->server_wrk = sct->server_wrk;
     735          13 :   cct->seg_ctx_index = sct->seg_ctx_index;
     736          13 :   cct->ct_seg_index = sct->ct_seg_index;
     737          13 :   cct->client_rx_fifo = ss->tx_fifo;
     738          13 :   cct->client_tx_fifo = ss->rx_fifo;
     739          13 :   cct->client_rx_fifo->refcnt++;
     740          13 :   cct->client_tx_fifo->refcnt++;
     741          13 :   cct->segment_handle = sct->segment_handle;
     742             : 
     743          13 :   session_set_state (ss, SESSION_STATE_ACCEPTING);
     744          13 :   if (app_worker_accept_notify (server_wrk, ss))
     745             :     {
     746           0 :       ct_session_connect_notify (ss, SESSION_E_REFUSED);
     747           0 :       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
     748           0 :       ct_connection_free (sct);
     749           0 :       session_free (ss);
     750             :     }
     751             : }
     752             : 
     753             : static void
     754          13 : ct_accept_rpc_wrk_handler (void *rpc_args)
     755             : {
     756             :   u32 thread_index, n_connects, i, n_pending;
     757          13 :   const u32 max_connects = 32;
     758             :   ct_worker_t *wrk;
     759          13 :   u8 need_rpc = 0;
     760             : 
     761          13 :   thread_index = pointer_to_uword (rpc_args);
     762          13 :   wrk = ct_worker_get (thread_index);
     763             : 
     764             :   /* Connects could be handled without worker barrier so grab lock */
     765          13 :   clib_spinlock_lock (&wrk->pending_connects_lock);
     766             : 
     767          13 :   n_pending = clib_fifo_elts (wrk->pending_connects);
     768          13 :   n_connects = clib_min (n_pending, max_connects);
     769          13 :   vec_validate (wrk->new_connects, n_connects);
     770             : 
     771          26 :   for (i = 0; i < n_connects; i++)
     772          13 :     clib_fifo_sub1 (wrk->pending_connects, wrk->new_connects[i]);
     773             : 
     774          13 :   if (n_pending == n_connects)
     775          13 :     wrk->have_connects = 0;
     776             :   else
     777           0 :     need_rpc = 1;
     778             : 
     779          13 :   clib_spinlock_unlock (&wrk->pending_connects_lock);
     780             : 
     781          26 :   for (i = 0; i < n_connects; i++)
     782          13 :     ct_accept_one (thread_index, wrk->new_connects[i]);
     783             : 
     784          13 :   if (need_rpc)
     785           0 :     session_send_rpc_evt_to_thread_force (
     786             :       thread_index, ct_accept_rpc_wrk_handler,
     787           0 :       uword_to_pointer (thread_index, void *));
     788          13 : }
     789             : 
     790             : static void
     791          13 : ct_fwrk_flush_connects (void *rpc_args)
     792             : {
     793             :   u32 thread_index, fwrk_index, n_workers;
     794          13 :   ct_main_t *cm = &ct_main;
     795             :   ct_worker_t *wrk;
     796             :   u8 need_rpc;
     797             : 
     798          13 :   fwrk_index = cm->fwrk_thread;
     799          13 :   n_workers = vec_len (cm->fwrk_pending_connects);
     800             : 
     801          26 :   for (thread_index = fwrk_index; thread_index < n_workers; thread_index++)
     802             :     {
     803          13 :       if (!vec_len (cm->fwrk_pending_connects[thread_index]))
     804           0 :         continue;
     805             : 
     806          13 :       wrk = ct_worker_get (thread_index);
     807             : 
     808             :       /* Connects can be done without worker barrier, grab dst worker lock */
     809          13 :       if (thread_index != fwrk_index)
     810           0 :         clib_spinlock_lock (&wrk->pending_connects_lock);
     811             : 
     812          13 :       clib_fifo_add (wrk->pending_connects,
     813             :                      cm->fwrk_pending_connects[thread_index],
     814             :                      vec_len (cm->fwrk_pending_connects[thread_index]));
     815          13 :       if (!wrk->have_connects)
     816             :         {
     817          13 :           wrk->have_connects = 1;
     818          13 :           need_rpc = 1;
     819             :         }
     820             : 
     821          13 :       if (thread_index != fwrk_index)
     822           0 :         clib_spinlock_unlock (&wrk->pending_connects_lock);
     823             : 
     824          13 :       vec_reset_length (cm->fwrk_pending_connects[thread_index]);
     825             : 
     826          13 :       if (need_rpc)
     827          13 :         session_send_rpc_evt_to_thread_force (
     828             :           thread_index, ct_accept_rpc_wrk_handler,
     829          13 :           uword_to_pointer (thread_index, void *));
     830             :     }
     831             : 
     832          13 :   cm->fwrk_have_flush = 0;
     833          13 : }
     834             : 
     835             : static void
     836          13 : ct_program_connect_to_wrk (u32 ho_index)
     837             : {
     838          13 :   ct_main_t *cm = &ct_main;
     839             :   u32 thread_index;
     840             : 
     841             :   /* Simple round-robin policy for spreading sessions over workers. We skip
     842             :    * thread index 0, i.e., offset the index by 1, when we have workers as it
     843             :    * is the one dedicated to main thread. Note that n_workers does not include
     844             :    * main thread */
     845          13 :   cm->n_sessions += 1;
     846          13 :   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
     847             : 
     848             :   /* Pospone flushing of connect request to dst worker until after session
     849             :    * layer fully initializes the half-open session. */
     850          13 :   vec_add1 (cm->fwrk_pending_connects[thread_index], ho_index);
     851          13 :   if (!cm->fwrk_have_flush)
     852             :     {
     853          13 :       session_send_rpc_evt_to_thread_force (
     854             :         cm->fwrk_thread, ct_fwrk_flush_connects,
     855          13 :         uword_to_pointer (thread_index, void *));
     856          13 :       cm->fwrk_have_flush = 1;
     857             :     }
     858          13 : }
     859             : 
     860             : static int
     861          13 : ct_connect (app_worker_t *client_wrk, session_t *ll,
     862             :             session_endpoint_cfg_t *sep)
     863             : {
     864             :   ct_connection_t *ho;
     865             :   u32 ho_index;
     866             : 
     867             :   /*
     868             :    * Alloc and init client half-open transport
     869             :    */
     870             : 
     871          13 :   ho = ct_half_open_alloc ();
     872          13 :   ho_index = ho->c_c_index;
     873          13 :   ho->c_rmt_port = sep->port;
     874          13 :   ho->c_lcl_port = 0;
     875          13 :   ho->c_is_ip4 = sep->is_ip4;
     876          13 :   ho->client_opaque = sep->opaque;
     877          13 :   ho->client_wrk = client_wrk->wrk_index;
     878          13 :   ho->peer_index = ll->session_index;
     879          13 :   ho->c_proto = TRANSPORT_PROTO_NONE;
     880          13 :   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
     881          13 :   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
     882          13 :   ho->flags |= CT_CONN_F_CLIENT;
     883          13 :   ho->c_s_index = ~0;
     884          13 :   ho->actual_tp = sep->original_tp;
     885             : 
     886             :   /*
     887             :    * Program connect on a worker, connected reply comes
     888             :    * after server accepts the connection.
     889             :    */
     890          13 :   ct_program_connect_to_wrk (ho_index);
     891             : 
     892          13 :   return ho_index;
     893             : }
     894             : 
     895             : static u32
     896          18 : ct_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
     897             : {
     898             :   session_endpoint_cfg_t *sep;
     899             :   ct_connection_t *ct;
     900             : 
     901          18 :   sep = (session_endpoint_cfg_t *) tep;
     902          18 :   ct = ct_connection_alloc (0);
     903          18 :   ct->server_wrk = sep->app_wrk_index;
     904          18 :   ct->c_is_ip4 = sep->is_ip4;
     905          18 :   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
     906          18 :   ct->c_lcl_port = sep->port;
     907          18 :   ct->c_s_index = app_listener_index;
     908          18 :   ct->actual_tp = sep->transport_proto;
     909          18 :   return ct->c_c_index;
     910             : }
     911             : 
     912             : static u32
     913          18 : ct_stop_listen (u32 ct_index)
     914             : {
     915             :   ct_connection_t *ct;
     916          18 :   ct = ct_connection_get (ct_index, 0);
     917          18 :   ct_connection_free (ct);
     918          18 :   return 0;
     919             : }
     920             : 
     921             : static transport_connection_t *
     922          79 : ct_listener_get (u32 ct_index)
     923             : {
     924          79 :   return (transport_connection_t *) ct_connection_get (ct_index, 0);
     925             : }
     926             : 
     927             : static transport_connection_t *
     928          13 : ct_session_half_open_get (u32 ct_index)
     929             : {
     930          13 :   return (transport_connection_t *) ct_half_open_get (ct_index);
     931             : }
     932             : 
     933             : static void
     934           1 : ct_session_cleanup (u32 conn_index, u32 thread_index)
     935             : {
     936             :   ct_connection_t *ct, *peer_ct;
     937             : 
     938           1 :   ct = ct_connection_get (conn_index, thread_index);
     939           1 :   if (!ct)
     940           0 :     return;
     941             : 
     942           1 :   peer_ct = ct_connection_get (ct->peer_index, thread_index);
     943           1 :   if (peer_ct)
     944           1 :     peer_ct->peer_index = ~0;
     945             : 
     946           1 :   ct_connection_free (ct);
     947             : }
     948             : 
     949             : static void
     950           0 : ct_cleanup_ho (u32 ho_index)
     951             : {
     952             :   ct_connection_t *ho;
     953             : 
     954           0 :   ho = ct_half_open_get (ho_index);
     955           0 :   ct_connection_free (ho);
     956           0 : }
     957             : 
     958             : static int
     959          17 : ct_session_connect (transport_endpoint_cfg_t * tep)
     960             : {
     961             :   session_endpoint_cfg_t *sep_ext;
     962          17 :   session_endpoint_t _sep, *sep = &_sep;
     963             :   app_worker_t *app_wrk;
     964             :   session_handle_t lh;
     965             :   application_t *app;
     966             :   app_listener_t *al;
     967             :   u32 table_index;
     968             :   session_t *ll;
     969             :   u8 fib_proto;
     970             : 
     971          17 :   sep_ext = (session_endpoint_cfg_t *) tep;
     972          17 :   _sep = *(session_endpoint_t *) tep;
     973          17 :   app_wrk = app_worker_get (sep_ext->app_wrk_index);
     974          17 :   app = application_get (app_wrk->app_index);
     975             : 
     976          17 :   sep->transport_proto = sep_ext->original_tp;
     977          17 :   table_index = application_local_session_table (app);
     978          17 :   lh = session_lookup_local_endpoint (table_index, sep);
     979          17 :   if (lh == SESSION_DROP_HANDLE)
     980           3 :     return SESSION_E_FILTERED;
     981             : 
     982          14 :   if (lh == SESSION_INVALID_HANDLE)
     983           1 :     goto global_scope;
     984             : 
     985          13 :   ll = listen_session_get_from_handle (lh);
     986          13 :   al = app_listener_get_w_session (ll);
     987             : 
     988             :   /*
     989             :    * Break loop if rule in local table points to connecting app. This
     990             :    * can happen if client is a generic proxy. Route connect through
     991             :    * global table instead.
     992             :    */
     993          13 :   if (al->app_index == app->app_index)
     994           0 :     goto global_scope;
     995             : 
     996          13 :   return ct_connect (app_wrk, ll, sep_ext);
     997             : 
     998             :   /*
     999             :    * If nothing found, check the global scope for locally attached
    1000             :    * destinations. Make sure first that we're allowed to.
    1001             :    */
    1002             : 
    1003           1 : global_scope:
    1004           1 :   if (session_endpoint_is_local (sep))
    1005           1 :     return SESSION_E_NOROUTE;
    1006             : 
    1007           0 :   if (!application_has_global_scope (app))
    1008           0 :     return SESSION_E_SCOPE;
    1009             : 
    1010           0 :   fib_proto = session_endpoint_fib_proto (sep);
    1011           0 :   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
    1012           0 :   ll = session_lookup_listener_wildcard (table_index, sep);
    1013             : 
    1014             :   /* Avoid connecting app to own listener */
    1015           0 :   if (ll && ll->app_index != app->app_index)
    1016           0 :     return ct_connect (app_wrk, ll, sep_ext);
    1017             : 
    1018             :   /* Failed to connect but no error */
    1019           0 :   return SESSION_E_LOCAL_CONNECT;
    1020             : }
    1021             : 
    1022             : static inline int
    1023          12 : ct_close_is_reset (ct_connection_t *ct, session_t *s)
    1024             : {
    1025          12 :   if (ct->flags & CT_CONN_F_CLIENT)
    1026           6 :     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
    1027             :   else
    1028           6 :     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
    1029             : }
    1030             : 
    1031             : static void
    1032          12 : ct_session_cleanup_server_session (session_t *s)
    1033             : {
    1034             :   ct_connection_t *ct;
    1035             : 
    1036          12 :   ct = (ct_connection_t *) session_get_transport (s);
    1037          12 :   ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
    1038          12 :   session_free (s);
    1039          12 :   ct_connection_free (ct);
    1040          12 : }
    1041             : 
    1042             : static void
    1043          25 : ct_session_postponed_cleanup (ct_connection_t *ct)
    1044             : {
    1045             :   ct_connection_t *peer_ct;
    1046             :   app_worker_t *app_wrk;
    1047             :   session_t *s;
    1048             : 
    1049          25 :   s = session_get (ct->c_s_index, ct->c_thread_index);
    1050          25 :   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
    1051             : 
    1052          25 :   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
    1053          25 :   if (peer_ct)
    1054             :     {
    1055          12 :       if (ct_close_is_reset (ct, s))
    1056           2 :         session_transport_reset_notify (&peer_ct->connection);
    1057             :       else
    1058          10 :         session_transport_closing_notify (&peer_ct->connection);
    1059             :     }
    1060          25 :   session_transport_closed_notify (&ct->connection);
    1061             : 
    1062             :   /* It would be cleaner to call session_transport_delete_notify
    1063             :    * but then we can't control session cleanup lower */
    1064          25 :   session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
    1065          25 :   if (app_wrk)
    1066          20 :     app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
    1067             : 
    1068          25 :   if (ct->flags & CT_CONN_F_CLIENT)
    1069             :     {
    1070             :       /* Normal free for client session as the fifos are allocated through
    1071             :        * the connects segment manager in a segment that's not shared with
    1072             :        * the server */
    1073          12 :       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
    1074          12 :       session_program_cleanup (s);
    1075          12 :       ct_connection_free (ct);
    1076             :     }
    1077             :   else
    1078             :     {
    1079             :       /* Manual session and fifo segment cleanup to avoid implicit
    1080             :        * segment manager cleanups and notifications */
    1081          13 :       if (app_wrk)
    1082             :         {
    1083             :           /* Remove custom cleanup notify infra when/if switching to normal
    1084             :            * session cleanup. Note that ct is freed in the cb function */
    1085          12 :           app_worker_cleanup_notify_custom (app_wrk, s,
    1086             :                                             SESSION_CLEANUP_SESSION,
    1087             :                                             ct_session_cleanup_server_session);
    1088             :         }
    1089             :       else
    1090             :         {
    1091           1 :           ct_connection_free (ct);
    1092             :         }
    1093             :     }
    1094          25 : }
    1095             : 
    1096             : static void
    1097          20 : ct_handle_cleanups (void *args)
    1098             : {
    1099          20 :   uword thread_index = pointer_to_uword (args);
    1100          20 :   const u32 max_cleanups = 100;
    1101             :   ct_cleanup_req_t *req;
    1102             :   ct_connection_t *ct;
    1103          20 :   u32 n_to_handle = 0;
    1104             :   ct_worker_t *wrk;
    1105             :   session_t *s;
    1106             : 
    1107          20 :   wrk = ct_worker_get (thread_index);
    1108          20 :   wrk->have_cleanups = 0;
    1109          20 :   n_to_handle = clib_fifo_elts (wrk->pending_cleanups);
    1110          20 :   n_to_handle = clib_min (n_to_handle, max_cleanups);
    1111             : 
    1112          45 :   while (n_to_handle)
    1113             :     {
    1114          25 :       clib_fifo_sub2 (wrk->pending_cleanups, req);
    1115          25 :       ct = ct_connection_get (req->ct_index, thread_index);
    1116          25 :       s = session_get (ct->c_s_index, ct->c_thread_index);
    1117          25 :       if (!svm_fifo_has_event (s->tx_fifo))
    1118          25 :         ct_session_postponed_cleanup (ct);
    1119             :       else
    1120           0 :         clib_fifo_add1 (wrk->pending_cleanups, *req);
    1121          25 :       n_to_handle -= 1;
    1122             :     }
    1123             : 
    1124          20 :   if (clib_fifo_elts (wrk->pending_cleanups))
    1125             :     {
    1126           0 :       wrk->have_cleanups = 1;
    1127           0 :       session_send_rpc_evt_to_thread_force (
    1128             :         thread_index, ct_handle_cleanups,
    1129             :         uword_to_pointer (thread_index, void *));
    1130             :     }
    1131          20 : }
    1132             : 
    1133             : static void
    1134          25 : ct_program_cleanup (ct_connection_t *ct)
    1135             : {
    1136             :   ct_cleanup_req_t *req;
    1137             :   uword thread_index;
    1138             :   ct_worker_t *wrk;
    1139             : 
    1140          25 :   thread_index = ct->c_thread_index;
    1141          25 :   wrk = ct_worker_get (ct->c_thread_index);
    1142             : 
    1143          25 :   clib_fifo_add2 (wrk->pending_cleanups, req);
    1144          25 :   req->ct_index = ct->c_c_index;
    1145             : 
    1146          25 :   if (wrk->have_cleanups)
    1147           5 :     return;
    1148             : 
    1149          20 :   wrk->have_cleanups = 1;
    1150          20 :   session_send_rpc_evt_to_thread_force (
    1151             :     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
    1152             : }
    1153             : 
    1154             : static void
    1155          25 : ct_session_close (u32 ct_index, u32 thread_index)
    1156             : {
    1157             :   ct_connection_t *ct, *peer_ct;
    1158             :   session_t *s;
    1159             : 
    1160          25 :   ct = ct_connection_get (ct_index, thread_index);
    1161          25 :   s = session_get (ct->c_s_index, ct->c_thread_index);
    1162          25 :   peer_ct = ct_connection_get (ct->peer_index, thread_index);
    1163          25 :   if (peer_ct)
    1164             :     {
    1165          12 :       peer_ct->peer_index = ~0;
    1166             :       /* Make sure session was allocated */
    1167          12 :       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
    1168             :         {
    1169           0 :           ct_session_connect_notify (s, SESSION_E_REFUSED);
    1170           0 :           ct->peer_index = ~0;
    1171             :         }
    1172          12 :       else if (peer_ct->c_s_index == ~0)
    1173             :         {
    1174             :           /* should not happen */
    1175           0 :           clib_warning ("ct peer without session");
    1176           0 :           ct_connection_free (peer_ct);
    1177             :         }
    1178             :     }
    1179             : 
    1180             :   /* Do not send closed notify to make sure pending tx events are
    1181             :    * still delivered and program cleanup */
    1182          25 :   ct_program_cleanup (ct);
    1183          25 : }
    1184             : 
    1185             : static transport_connection_t *
    1186      574349 : ct_session_get (u32 ct_index, u32 thread_index)
    1187             : {
    1188      574349 :   return (transport_connection_t *) ct_connection_get (ct_index,
    1189             :                                                        thread_index);
    1190             : }
    1191             : 
    1192             : static u8 *
    1193           0 : format_ct_connection_id (u8 * s, va_list * args)
    1194             : {
    1195           0 :   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
    1196           0 :   if (!ct)
    1197           0 :     return s;
    1198           0 :   if (ct->c_is_ip4)
    1199             :     {
    1200           0 :       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
    1201           0 :                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
    1202             :                   format_ip4_address, &ct->c_lcl_ip4,
    1203           0 :                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
    1204           0 :                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
    1205             :     }
    1206             :   else
    1207             :     {
    1208           0 :       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
    1209           0 :                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
    1210             :                   format_ip6_address, &ct->c_lcl_ip6,
    1211           0 :                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
    1212           0 :                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
    1213             :     }
    1214             : 
    1215           0 :   return s;
    1216             : }
    1217             : 
    1218             : static int
    1219      137279 : ct_custom_tx (void *session, transport_send_params_t * sp)
    1220             : {
    1221      137279 :   session_t *s = (session_t *) session;
    1222      137279 :   if (session_has_transport (s))
    1223           0 :     return 0;
    1224             :   /* If event enqueued towards peer, remove from scheduler and remove
    1225             :    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
    1226             :    * avoid missing events if peer did not clear fifo flag yet, which is
    1227             :    * interpreted as successful notification and session is descheduled. */
    1228      137279 :   svm_fifo_unset_event (s->tx_fifo);
    1229      137279 :   if (!ct_session_tx (s))
    1230      137279 :     sp->flags = TRANSPORT_SND_F_DESCHED;
    1231             : 
    1232             :   /* The scheduler uses packet count as a means of upper bounding the amount
    1233             :    * of work done per dispatch. So make it look like we have sent something */
    1234      137279 :   return 1;
    1235             : }
    1236             : 
    1237             : static int
    1238        4987 : ct_app_rx_evt (transport_connection_t * tc)
    1239             : {
    1240        4987 :   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
    1241             :   session_t *ps, *s;
    1242             : 
    1243        4987 :   s = session_get (ct->c_s_index, ct->c_thread_index);
    1244        4987 :   if (session_has_transport (s) || s->session_state < SESSION_STATE_READY)
    1245           0 :     return -1;
    1246        4987 :   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
    1247        4987 :   if (!peer_ct || (peer_ct->flags & CT_CONN_F_HALF_OPEN))
    1248           0 :     return -1;
    1249        4987 :   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
    1250        4987 :   if (ps->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
    1251           0 :     return -1;
    1252        4987 :   return session_dequeue_notify (ps);
    1253             : }
    1254             : 
    1255             : static u8 *
    1256           0 : format_ct_listener (u8 * s, va_list * args)
    1257             : {
    1258           0 :   u32 tc_index = va_arg (*args, u32);
    1259           0 :   u32 __clib_unused thread_index = va_arg (*args, u32);
    1260           0 :   u32 __clib_unused verbose = va_arg (*args, u32);
    1261           0 :   ct_connection_t *ct = ct_connection_get (tc_index, 0);
    1262           0 :   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
    1263           0 :   if (verbose)
    1264           0 :     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
    1265           0 :   return s;
    1266             : }
    1267             : 
    1268             : static u8 *
    1269           0 : format_ct_half_open (u8 *s, va_list *args)
    1270             : {
    1271           0 :   u32 ho_index = va_arg (*args, u32);
    1272           0 :   u32 verbose = va_arg (*args, u32);
    1273           0 :   ct_connection_t *ct = ct_half_open_get (ho_index);
    1274           0 :   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
    1275           0 :   if (verbose)
    1276           0 :     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
    1277           0 :   return s;
    1278             : }
    1279             : 
    1280             : static u8 *
    1281           0 : format_ct_connection (u8 * s, va_list * args)
    1282             : {
    1283           0 :   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
    1284           0 :   u32 verbose = va_arg (*args, u32);
    1285             : 
    1286           0 :   if (!ct)
    1287           0 :     return s;
    1288           0 :   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
    1289           0 :   if (verbose)
    1290             :     {
    1291           0 :       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
    1292           0 :       if (verbose > 1)
    1293             :         {
    1294           0 :           s = format (s, "\n");
    1295             :         }
    1296             :     }
    1297           0 :   return s;
    1298             : }
    1299             : 
    1300             : static u8 *
    1301           0 : format_ct_session (u8 * s, va_list * args)
    1302             : {
    1303           0 :   u32 ct_index = va_arg (*args, u32);
    1304           0 :   u32 thread_index = va_arg (*args, u32);
    1305           0 :   u32 verbose = va_arg (*args, u32);
    1306             :   ct_connection_t *ct;
    1307             : 
    1308           0 :   ct = ct_connection_get (ct_index, thread_index);
    1309           0 :   if (!ct)
    1310             :     {
    1311           0 :       s = format (s, "empty\n");
    1312           0 :       return s;
    1313             :     }
    1314             : 
    1315           0 :   s = format (s, "%U", format_ct_connection, ct, verbose);
    1316           0 :   return s;
    1317             : }
    1318             : 
    1319             : clib_error_t *
    1320          57 : ct_enable_disable (vlib_main_t * vm, u8 is_en)
    1321             : {
    1322          57 :   vlib_thread_main_t *vtm = &vlib_thread_main;
    1323          57 :   ct_main_t *cm = &ct_main;
    1324             :   ct_worker_t *wrk;
    1325             : 
    1326          57 :   cm->n_workers = vlib_num_workers ();
    1327          57 :   cm->fwrk_thread = transport_cl_thread ();
    1328          57 :   vec_validate (cm->wrk, vtm->n_vlib_mains);
    1329         192 :   vec_foreach (wrk, cm->wrk)
    1330         135 :     clib_spinlock_init (&wrk->pending_connects_lock);
    1331          57 :   clib_spinlock_init (&cm->ho_reuseable_lock);
    1332          57 :   clib_rwlock_init (&cm->app_segs_lock);
    1333          57 :   vec_validate (cm->fwrk_pending_connects, cm->n_workers);
    1334          57 :   return 0;
    1335             : }
    1336             : 
    1337             : /* *INDENT-OFF* */
    1338             : static const transport_proto_vft_t cut_thru_proto = {
    1339             :   .enable = ct_enable_disable,
    1340             :   .start_listen = ct_start_listen,
    1341             :   .stop_listen = ct_stop_listen,
    1342             :   .get_connection = ct_session_get,
    1343             :   .get_listener = ct_listener_get,
    1344             :   .get_half_open = ct_session_half_open_get,
    1345             :   .cleanup = ct_session_cleanup,
    1346             :   .cleanup_ho = ct_cleanup_ho,
    1347             :   .connect = ct_session_connect,
    1348             :   .close = ct_session_close,
    1349             :   .custom_tx = ct_custom_tx,
    1350             :   .app_rx_evt = ct_app_rx_evt,
    1351             :   .format_listener = format_ct_listener,
    1352             :   .format_half_open = format_ct_half_open,
    1353             :   .format_connection = format_ct_session,
    1354             :   .transport_options = {
    1355             :     .name = "ct",
    1356             :     .short_name = "C",
    1357             :     .tx_type = TRANSPORT_TX_INTERNAL,
    1358             :     .service_type = TRANSPORT_SERVICE_VC,
    1359             :   },
    1360             : };
    1361             : /* *INDENT-ON* */
    1362             : 
    1363             : static inline int
    1364      137279 : ct_session_can_tx (session_t *s)
    1365             : {
    1366      137281 :   return (s->session_state == SESSION_STATE_READY ||
    1367      137279 :           s->session_state == SESSION_STATE_CLOSING ||
    1368           0 :           s->session_state == SESSION_STATE_APP_CLOSED);
    1369             : }
    1370             : 
    1371             : int
    1372      137279 : ct_session_tx (session_t * s)
    1373             : {
    1374             :   ct_connection_t *ct, *peer_ct;
    1375             :   session_t *peer_s;
    1376             : 
    1377      137279 :   if (!ct_session_can_tx (s))
    1378           0 :     return 0;
    1379      137279 :   ct = (ct_connection_t *) session_get_transport (s);
    1380      137279 :   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
    1381      137279 :   if (!peer_ct)
    1382           0 :     return 0;
    1383      137279 :   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
    1384      137279 :   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
    1385           2 :     return 0;
    1386      137277 :   return session_enqueue_notify (peer_s);
    1387             : }
    1388             : 
    1389             : static clib_error_t *
    1390         575 : ct_transport_init (vlib_main_t * vm)
    1391             : {
    1392         575 :   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
    1393             :                                FIB_PROTOCOL_IP4, ~0);
    1394         575 :   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
    1395             :                                FIB_PROTOCOL_IP6, ~0);
    1396         575 :   return 0;
    1397             : }
    1398             : 
    1399       78911 : VLIB_INIT_FUNCTION (ct_transport_init);
    1400             : 
    1401             : /*
    1402             :  * fd.io coding-style-patch-verification: ON
    1403             :  *
    1404             :  * Local Variables:
    1405             :  * eval: (c-set-style "gnu")
    1406             :  * End:
    1407             :  */

Generated by: LCOV version 1.14