LCOV - code coverage report
Current view: top level - vnet/session - application_local.c (source / functions) Hit Total Coverage
Test: coverage-filtered.info Lines: 495 640 77.3 %
Date: 2023-07-05 22:20:52 Functions: 40 47 85.1 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2019 Cisco and/or its affiliates.
       3             :  * Licensed under the Apache License, Version 2.0 (the "License");
       4             :  * you may not use this file except in compliance with the License.
       5             :  * You may obtain a copy of the License at:
       6             :  *
       7             :  *     http://www.apache.org/licenses/LICENSE-2.0
       8             :  *
       9             :  * Unless required by applicable law or agreed to in writing, software
      10             :  * distributed under the License is distributed on an "AS IS" BASIS,
      11             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             :  * See the License for the specific language governing permissions and
      13             :  * limitations under the License.
      14             :  */
      15             : 
      16             : #include <vnet/session/application_local.h>
      17             : #include <vnet/session/session.h>
      18             : 
      19             : typedef enum ct_segment_flags_
      20             : {
      21             :   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
      22             :   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
      23             : } ct_segment_flags_t;
      24             : 
      25             : typedef struct ct_segment_
      26             : {
      27             :   u32 client_n_sessions;
      28             :   u32 server_n_sessions;
      29             :   u32 seg_ctx_index;
      30             :   u32 ct_seg_index;
      31             :   u32 segment_index;
      32             :   ct_segment_flags_t flags;
      33             : } ct_segment_t;
      34             : 
      35             : typedef struct ct_segments_
      36             : {
      37             :   u32 sm_index;
      38             :   u32 server_wrk;
      39             :   u32 client_wrk;
      40             :   u32 fifo_pair_bytes;
      41             :   ct_segment_t *segments;
      42             : } ct_segments_ctx_t;
      43             : 
      44             : typedef struct ct_cleanup_req_
      45             : {
      46             :   u32 ct_index;
      47             : } ct_cleanup_req_t;
      48             : 
      49             : typedef struct ct_worker_
      50             : {
      51             :   ct_connection_t *connections;       /**< Per-worker connection pools */
      52             :   u32 *pending_connects;              /**< Fifo of pending ho indices */
      53             :   ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
      54             :   u8 have_connects;                   /**< Set if connect rpc pending */
      55             :   u8 have_cleanups;                   /**< Set if cleanup rpc pending */
      56             :   clib_spinlock_t pending_connects_lock; /**< Lock for pending connects */
      57             :   u32 *new_connects;                     /**< Burst of connects to be done */
      58             : } ct_worker_t;
      59             : 
      60             : typedef struct ct_main_
      61             : {
      62             :   ct_worker_t *wrk;                     /**< Per-worker state */
      63             :   u32 n_workers;                        /**< Number of vpp workers */
      64             :   u32 n_sessions;                       /**< Cumulative sessions counter */
      65             :   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
      66             :   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
      67             :   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
      68             :   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
      69             :   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
      70             :   u32 **fwrk_pending_connects;          /**< First wrk pending half-opens */
      71             :   u32 fwrk_thread;                      /**< First worker thread */
      72             :   u8 fwrk_have_flush;                   /**< Flag for connect flush rpc */
      73             : } ct_main_t;
      74             : 
      75             : static ct_main_t ct_main;
      76             : 
      77             : static inline ct_worker_t *
      78      499797 : ct_worker_get (u32 thread_index)
      79             : {
      80      499797 :   return &ct_main.wrk[thread_index];
      81             : }
      82             : 
      83             : static ct_connection_t *
      84          57 : ct_connection_alloc (u32 thread_index)
      85             : {
      86          57 :   ct_worker_t *wrk = ct_worker_get (thread_index);
      87             :   ct_connection_t *ct;
      88             : 
      89          57 :   pool_get_aligned_safe (wrk->connections, ct, CLIB_CACHE_LINE_BYTES);
      90          57 :   clib_memset (ct, 0, sizeof (*ct));
      91          57 :   ct->c_c_index = ct - wrk->connections;
      92          57 :   ct->c_thread_index = thread_index;
      93          57 :   ct->client_wrk = ~0;
      94          57 :   ct->server_wrk = ~0;
      95          57 :   ct->seg_ctx_index = ~0;
      96          57 :   ct->ct_seg_index = ~0;
      97          57 :   return ct;
      98             : }
      99             : 
     100             : static ct_connection_t *
     101      499625 : ct_connection_get (u32 ct_index, u32 thread_index)
     102             : {
     103      499625 :   ct_worker_t *wrk = ct_worker_get (thread_index);
     104             : 
     105      499625 :   if (pool_is_free_index (wrk->connections, ct_index))
     106          31 :     return 0;
     107      499594 :   return pool_elt_at_index (wrk->connections, ct_index);
     108             : }
     109             : 
     110             : static void
     111          44 : ct_connection_free (ct_connection_t * ct)
     112             : {
     113          44 :   ct_worker_t *wrk = ct_worker_get (ct->c_thread_index);
     114             : 
     115             :   if (CLIB_DEBUG)
     116             :     {
     117          44 :       clib_memset (ct, 0xfc, sizeof (*ct));
     118          44 :       pool_put (wrk->connections, ct);
     119          44 :       return;
     120             :     }
     121             :   pool_put (wrk->connections, ct);
     122             : }
     123             : 
     124             : static ct_connection_t *
     125          13 : ct_half_open_alloc (void)
     126             : {
     127          13 :   ct_main_t *cm = &ct_main;
     128             :   u32 *hip;
     129             : 
     130          13 :   clib_spinlock_lock (&cm->ho_reuseable_lock);
     131          21 :   vec_foreach (hip, cm->ho_reusable)
     132           8 :     pool_put_index (cm->wrk[cm->fwrk_thread].connections, *hip);
     133          13 :   vec_reset_length (cm->ho_reusable);
     134          13 :   clib_spinlock_unlock (&cm->ho_reuseable_lock);
     135             : 
     136          13 :   return ct_connection_alloc (cm->fwrk_thread);
     137             : }
     138             : 
     139             : static ct_connection_t *
     140          26 : ct_half_open_get (u32 ho_index)
     141             : {
     142          26 :   ct_main_t *cm = &ct_main;
     143          26 :   return ct_connection_get (ho_index, cm->fwrk_thread);
     144             : }
     145             : 
     146             : void
     147          13 : ct_half_open_add_reusable (u32 ho_index)
     148             : {
     149          13 :   ct_main_t *cm = &ct_main;
     150             : 
     151          13 :   clib_spinlock_lock (&cm->ho_reuseable_lock);
     152          13 :   vec_add1 (cm->ho_reusable, ho_index);
     153          13 :   clib_spinlock_unlock (&cm->ho_reuseable_lock);
     154          13 : }
     155             : 
     156             : session_t *
     157          12 : ct_session_get_peer (session_t * s)
     158             : {
     159             :   ct_connection_t *ct, *peer_ct;
     160          12 :   ct = ct_connection_get (s->connection_index, s->thread_index);
     161          12 :   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
     162          12 :   return session_get (peer_ct->c_s_index, s->thread_index);
     163             : }
     164             : 
     165             : void
     166          18 : ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
     167             : {
     168             :   ct_connection_t *ct;
     169          18 :   ct = (ct_connection_t *) session_get_transport (ll);
     170          18 :   sep->transport_proto = ct->actual_tp;
     171          18 :   sep->port = ct->c_lcl_port;
     172          18 :   sep->is_ip4 = ct->c_is_ip4;
     173          18 :   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
     174          18 : }
     175             : 
     176             : static void
     177           5 : ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
     178             : {
     179             :   ct_connection_t *peer_ct;
     180             : 
     181           5 :   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
     182             : 
     183           5 :   if (is_client)
     184             :     {
     185           4 :       ct->client_wrk = APP_INVALID_INDEX;
     186           4 :       if (peer_ct)
     187           0 :         ct->client_wrk = APP_INVALID_INDEX;
     188             :     }
     189             :   else
     190             :     {
     191           1 :       ct->server_wrk = APP_INVALID_INDEX;
     192           1 :       if (peer_ct)
     193           0 :         ct->server_wrk = APP_INVALID_INDEX;
     194             :     }
     195           5 : }
     196             : 
     197             : static void
     198          25 : ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
     199             :                           svm_fifo_t *tx_fifo)
     200             : {
     201             :   ct_segments_ctx_t *seg_ctx;
     202          25 :   ct_main_t *cm = &ct_main;
     203             :   segment_manager_t *sm;
     204             :   app_worker_t *app_wrk;
     205             :   ct_segment_t *ct_seg;
     206             :   fifo_segment_t *fs;
     207             :   u32 seg_index;
     208             :   session_t *s;
     209             :   int cnt;
     210             : 
     211             :   /*
     212             :    * Cleanup fifos
     213             :    */
     214             : 
     215          25 :   sm = segment_manager_get (rx_fifo->segment_manager);
     216          25 :   seg_index = rx_fifo->segment_index;
     217             : 
     218          25 :   fs = segment_manager_get_segment_w_lock (sm, seg_index);
     219          25 :   fifo_segment_free_fifo (fs, rx_fifo);
     220          25 :   fifo_segment_free_fifo (fs, tx_fifo);
     221          25 :   segment_manager_segment_reader_unlock (sm);
     222             : 
     223             :   /*
     224             :    * Atomically update segment context with readers lock
     225             :    */
     226             : 
     227          25 :   clib_rwlock_reader_lock (&cm->app_segs_lock);
     228             : 
     229          25 :   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     230          25 :   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
     231             : 
     232          25 :   if (ct->flags & CT_CONN_F_CLIENT)
     233             :     {
     234          12 :       cnt =
     235          12 :         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
     236             :     }
     237             :   else
     238             :     {
     239          13 :       cnt =
     240          13 :         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
     241             :     }
     242             : 
     243          25 :   clib_rwlock_reader_unlock (&cm->app_segs_lock);
     244             : 
     245             :   /*
     246             :    * No need to do any app updates, return
     247             :    */
     248          25 :   ASSERT (cnt >= 0);
     249          25 :   if (cnt)
     250           8 :     return;
     251             : 
     252             :   /*
     253             :    * Grab exclusive lock and update flags unless some other thread
     254             :    * added more sessions
     255             :    */
     256          17 :   clib_rwlock_writer_lock (&cm->app_segs_lock);
     257             : 
     258          17 :   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     259          17 :   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
     260          17 :   if (ct->flags & CT_CONN_F_CLIENT)
     261             :     {
     262           8 :       cnt = ct_seg->client_n_sessions;
     263           8 :       if (cnt)
     264           0 :         goto done;
     265           8 :       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
     266           8 :       s = session_get (ct->c_s_index, ct->c_thread_index);
     267           8 :       if (s->app_wrk_index == APP_INVALID_INDEX)
     268           4 :         ct_set_invalid_app_wrk (ct, 1 /* is_client */);
     269             :     }
     270             :   else
     271             :     {
     272           9 :       cnt = ct_seg->server_n_sessions;
     273           9 :       if (cnt)
     274           0 :         goto done;
     275           9 :       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
     276           9 :       s = session_get (ct->c_s_index, ct->c_thread_index);
     277           9 :       if (s->app_wrk_index == APP_INVALID_INDEX)
     278           1 :         ct_set_invalid_app_wrk (ct, 0 /* is_client */);
     279             :     }
     280             : 
     281          17 :   if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
     282          12 :       !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
     283           9 :     goto done;
     284             : 
     285             :   /*
     286             :    * Remove segment context because both client and server detached
     287             :    */
     288             : 
     289           8 :   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
     290             : 
     291             :   /*
     292             :    * No more segment indices left, remove the segments context
     293             :    */
     294           8 :   if (!pool_elts (seg_ctx->segments))
     295             :     {
     296           8 :       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
     297           8 :       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
     298           8 :       hash_unset (cm->app_segs_ctxs_table, table_handle);
     299           8 :       pool_free (seg_ctx->segments);
     300           8 :       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     301             :     }
     302             : 
     303             :   /*
     304             :    * Segment to be removed so notify both apps
     305             :    */
     306             : 
     307           8 :   app_wrk = app_worker_get_if_valid (ct->client_wrk);
     308             :   /* Determine if client app still needs notification, i.e., if it is
     309             :    * still attached. If client detached and this is the last ct session
     310             :    * on this segment, then its connects segment manager should also be
     311             :    * detached, so do not send notification */
     312           8 :   if (app_wrk)
     313             :     {
     314             :       segment_manager_t *csm;
     315           4 :       csm = app_worker_get_connect_segment_manager (app_wrk);
     316           4 :       if (!segment_manager_app_detached (csm))
     317           4 :         app_worker_del_segment_notify (app_wrk, ct->segment_handle);
     318             :     }
     319             : 
     320             :   /* Notify server app and free segment */
     321           8 :   segment_manager_lock_and_del_segment (sm, seg_index);
     322             : 
     323             :   /* Cleanup segment manager if needed. If server detaches there's a chance
     324             :    * the client's sessions will hold up segment removal */
     325           8 :   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
     326           6 :     segment_manager_free_safe (sm);
     327             : 
     328           2 : done:
     329             : 
     330          17 :   clib_rwlock_writer_unlock (&cm->app_segs_lock);
     331             : }
     332             : 
     333             : static void
     334           0 : ct_session_force_disconnect_server (ct_connection_t *sct)
     335             : {
     336           0 :   sct->peer_index = ~0;
     337           0 :   session_transport_closing_notify (&sct->connection);
     338           0 : }
     339             : 
     340             : int
     341          12 : ct_session_connect_notify (session_t *ss, session_error_t err)
     342             : {
     343             :   u32 ss_index, opaque, thread_index;
     344             :   ct_connection_t *sct, *cct;
     345             :   app_worker_t *client_wrk;
     346             :   session_t *cs;
     347             : 
     348          12 :   ss_index = ss->session_index;
     349          12 :   thread_index = ss->thread_index;
     350          12 :   sct = (ct_connection_t *) session_get_transport (ss);
     351          12 :   client_wrk = app_worker_get (sct->client_wrk);
     352          12 :   opaque = sct->client_opaque;
     353             : 
     354          12 :   cct = ct_connection_get (sct->peer_index, thread_index);
     355             : 
     356             :   /* Client closed while waiting for reply from server */
     357          12 :   if (PREDICT_FALSE (!cct))
     358             :     {
     359           0 :       ct_session_force_disconnect_server (sct);
     360           0 :       return 0;
     361             :     }
     362             : 
     363          12 :   session_half_open_delete_notify (&cct->connection);
     364          12 :   cct->flags &= ~CT_CONN_F_HALF_OPEN;
     365             : 
     366          12 :   if (PREDICT_FALSE (err))
     367           0 :     goto connect_error;
     368             : 
     369             :   /*
     370             :    * Alloc client session, server session assumed to be established
     371             :    */
     372             : 
     373          12 :   ASSERT (ss->session_state >= SESSION_STATE_READY);
     374             : 
     375          12 :   cs = session_alloc (thread_index);
     376          12 :   ss = session_get (ss_index, thread_index);
     377          12 :   cs->session_type = ss->session_type;
     378          12 :   cs->listener_handle = SESSION_INVALID_HANDLE;
     379          12 :   session_set_state (cs, SESSION_STATE_CONNECTING);
     380          12 :   cs->app_wrk_index = client_wrk->wrk_index;
     381          12 :   cs->connection_index = cct->c_c_index;
     382          12 :   cct->c_s_index = cs->session_index;
     383             : 
     384             :   /* This will allocate fifos for the session. They won't be used for
     385             :    * exchanging data but they will be used to close the connection if
     386             :    * the segment manager/worker is freed */
     387          12 :   if ((err = app_worker_init_connected (client_wrk, cs)))
     388             :     {
     389           0 :       session_free (cs);
     390           0 :       ct_session_force_disconnect_server (sct);
     391           0 :       err = SESSION_E_ALLOC;
     392           0 :       goto connect_error;
     393             :     }
     394             : 
     395          12 :   session_set_state (cs, SESSION_STATE_CONNECTING);
     396             : 
     397          12 :   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
     398             :     {
     399           0 :       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
     400           0 :       session_free (cs);
     401           0 :       ct_session_force_disconnect_server (sct);
     402           0 :       goto cleanup_client;
     403             :     }
     404             : 
     405          12 :   cs = session_get (cct->c_s_index, cct->c_thread_index);
     406          12 :   session_set_state (cs, SESSION_STATE_READY);
     407             : 
     408          12 :   return 0;
     409             : 
     410           0 : connect_error:
     411             : 
     412           0 :   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
     413             : 
     414           0 : cleanup_client:
     415             : 
     416           0 :   if (cct->client_rx_fifo)
     417           0 :     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
     418           0 :   ct_connection_free (cct);
     419           0 :   return -1;
     420             : }
     421             : 
     422             : static inline ct_segment_t *
     423           4 : ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
     424             :                         u32 seg_ctx_index)
     425             : {
     426             :   uword free_bytes, max_free_bytes;
     427           4 :   ct_segment_t *ct_seg, *res = 0;
     428             :   ct_segments_ctx_t *seg_ctx;
     429             :   fifo_segment_t *fs;
     430             :   u32 max_fifos;
     431             : 
     432           4 :   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
     433           4 :   max_free_bytes = seg_ctx->fifo_pair_bytes;
     434             : 
     435           8 :   pool_foreach (ct_seg, seg_ctx->segments)
     436             :     {
     437             :       /* Client or server has detached so segment cannot be used */
     438           4 :       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
     439           4 :       free_bytes = fifo_segment_available_bytes (fs);
     440           4 :       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
     441           4 :       if (free_bytes > max_free_bytes &&
     442           4 :           fifo_segment_num_fifos (fs) / 2 < max_fifos)
     443             :         {
     444           4 :           max_free_bytes = free_bytes;
     445           4 :           res = ct_seg;
     446             :         }
     447             :     }
     448             : 
     449           4 :   return res;
     450             : }
     451             : 
     452             : static ct_segment_t *
     453           9 : ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
     454             :                   segment_manager_t *sm, u32 client_wrk_index)
     455             : {
     456           9 :   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
     457             :   segment_manager_props_t *props;
     458           9 :   const u32 margin = 16 << 10;
     459             :   ct_segments_ctx_t *seg_ctx;
     460             :   app_worker_t *client_wrk;
     461             :   u64 seg_size, seg_handle;
     462             :   application_t *server;
     463             :   ct_segment_t *ct_seg;
     464             :   uword *spp;
     465             :   int fs_index;
     466             : 
     467           9 :   server = application_get (server_wrk->app_index);
     468           9 :   props = application_segment_manager_properties (server);
     469           9 :   sm_index = segment_manager_index (sm);
     470           9 :   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
     471             : 
     472             :   /*
     473             :    * Make sure another thread did not alloc a segment while acquiring the lock
     474             :    */
     475             : 
     476           9 :   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
     477           9 :   if (spp)
     478             :     {
     479           0 :       seg_ctx_index = *spp;
     480           0 :       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
     481           0 :       if (ct_seg)
     482           0 :         return ct_seg;
     483             :     }
     484             : 
     485             :   /*
     486             :    * No segment, try to alloc one and notify the server and the client.
     487             :    * Make sure the segment is not used for other fifos
     488             :    */
     489           9 :   seg_size = clib_max (props->segment_size, 128 << 20);
     490             :   fs_index =
     491           9 :     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
     492           9 :   if (fs_index < 0)
     493           0 :     return 0;
     494             : 
     495           9 :   if (seg_ctx_index == ~0)
     496             :     {
     497           9 :       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
     498           9 :       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
     499           9 :       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
     500           9 :       seg_ctx->server_wrk = server_wrk->wrk_index;
     501           9 :       seg_ctx->client_wrk = client_wrk_index;
     502           9 :       seg_ctx->sm_index = sm_index;
     503           9 :       seg_ctx->fifo_pair_bytes = pair_bytes;
     504             :     }
     505             :   else
     506             :     {
     507           0 :       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
     508             :     }
     509             : 
     510           9 :   pool_get_zero (seg_ctx->segments, ct_seg);
     511           9 :   ct_seg->segment_index = fs_index;
     512           9 :   ct_seg->server_n_sessions = 0;
     513           9 :   ct_seg->client_n_sessions = 0;
     514           9 :   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
     515           9 :   ct_seg->seg_ctx_index = seg_ctx_index;
     516             : 
     517             :   /* New segment, notify the server and client */
     518           9 :   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
     519           9 :   if (app_worker_add_segment_notify (server_wrk, seg_handle))
     520           0 :     goto error;
     521             : 
     522           9 :   client_wrk = app_worker_get (client_wrk_index);
     523           9 :   if (app_worker_add_segment_notify (client_wrk, seg_handle))
     524             :     {
     525           0 :       app_worker_del_segment_notify (server_wrk, seg_handle);
     526           0 :       goto error;
     527             :     }
     528             : 
     529           9 :   return ct_seg;
     530             : 
     531           0 : error:
     532             : 
     533           0 :   segment_manager_lock_and_del_segment (sm, fs_index);
     534           0 :   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
     535           0 :   return 0;
     536             : }
     537             : 
     538             : static int
     539          13 : ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
     540             :                           session_t *ls, session_t *ll)
     541             : {
     542             :   segment_manager_props_t *props;
     543             :   u64 seg_handle, table_handle;
     544          13 :   u32 sm_index, fs_index = ~0;
     545             :   ct_segments_ctx_t *seg_ctx;
     546          13 :   ct_main_t *cm = &ct_main;
     547             :   application_t *server;
     548             :   segment_manager_t *sm;
     549             :   ct_segment_t *ct_seg;
     550             :   fifo_segment_t *fs;
     551             :   uword *spp;
     552             :   int rv;
     553             : 
     554          13 :   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
     555          13 :   sm_index = segment_manager_index (sm);
     556          13 :   server = application_get (server_wrk->app_index);
     557          13 :   props = application_segment_manager_properties (server);
     558             : 
     559          13 :   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
     560          13 :   table_handle = (u64) sm_index << 32 | table_handle;
     561             : 
     562             :   /*
     563             :    * Check if we already have a segment that can hold the fifos
     564             :    */
     565             : 
     566          13 :   clib_rwlock_reader_lock (&cm->app_segs_lock);
     567             : 
     568          13 :   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
     569          13 :   if (spp)
     570             :     {
     571           4 :       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
     572           4 :       if (ct_seg)
     573             :         {
     574           4 :           ct->seg_ctx_index = ct_seg->seg_ctx_index;
     575           4 :           ct->ct_seg_index = ct_seg->ct_seg_index;
     576           4 :           fs_index = ct_seg->segment_index;
     577           4 :           ct_seg->flags &=
     578             :             ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
     579           4 :           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
     580           4 :           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
     581             :         }
     582             :     }
     583             : 
     584          13 :   clib_rwlock_reader_unlock (&cm->app_segs_lock);
     585             : 
     586             :   /*
     587             :    * If not, grab exclusive lock and allocate segment
     588             :    */
     589          13 :   if (fs_index == ~0)
     590             :     {
     591           9 :       clib_rwlock_writer_lock (&cm->app_segs_lock);
     592             : 
     593             :       ct_seg =
     594           9 :         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
     595           9 :       if (!ct_seg)
     596             :         {
     597           0 :           clib_rwlock_writer_unlock (&cm->app_segs_lock);
     598           0 :           return -1;
     599             :         }
     600             : 
     601           9 :       ct->seg_ctx_index = ct_seg->seg_ctx_index;
     602           9 :       ct->ct_seg_index = ct_seg->ct_seg_index;
     603           9 :       ct_seg->server_n_sessions += 1;
     604           9 :       ct_seg->client_n_sessions += 1;
     605           9 :       fs_index = ct_seg->segment_index;
     606             : 
     607           9 :       clib_rwlock_writer_unlock (&cm->app_segs_lock);
     608             :     }
     609             : 
     610             :   /*
     611             :    * Allocate and initialize the fifos
     612             :    */
     613          13 :   fs = segment_manager_get_segment_w_lock (sm, fs_index);
     614          13 :   rv = segment_manager_try_alloc_fifos (
     615          13 :     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
     616             :     &ls->rx_fifo, &ls->tx_fifo);
     617          13 :   if (rv)
     618             :     {
     619           0 :       segment_manager_segment_reader_unlock (sm);
     620             : 
     621           0 :       clib_rwlock_reader_lock (&cm->app_segs_lock);
     622             : 
     623           0 :       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
     624           0 :       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
     625           0 :       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
     626           0 :       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
     627             : 
     628           0 :       clib_rwlock_reader_unlock (&cm->app_segs_lock);
     629             : 
     630           0 :       return rv;
     631             :     }
     632             : 
     633          13 :   ls->rx_fifo->shr->master_session_index = ls->session_index;
     634          13 :   ls->tx_fifo->shr->master_session_index = ls->session_index;
     635          13 :   ls->rx_fifo->master_thread_index = ls->thread_index;
     636          13 :   ls->tx_fifo->master_thread_index = ls->thread_index;
     637             : 
     638          13 :   seg_handle = segment_manager_segment_handle (sm, fs);
     639          13 :   segment_manager_segment_reader_unlock (sm);
     640             : 
     641          13 :   ct->segment_handle = seg_handle;
     642             : 
     643          13 :   return 0;
     644             : }
     645             : 
     646             : static void
     647          13 : ct_accept_one (u32 thread_index, u32 ho_index)
     648             : {
     649             :   ct_connection_t *sct, *cct, *ho;
     650             :   transport_connection_t *ll_ct;
     651             :   app_worker_t *server_wrk;
     652             :   u32 cct_index, ll_index;
     653             :   session_t *ss, *ll;
     654             : 
     655             :   /*
     656             :    * Alloc client ct and initialize from ho
     657             :    */
     658          13 :   cct = ct_connection_alloc (thread_index);
     659          13 :   cct_index = cct->c_c_index;
     660             : 
     661          13 :   ho = ct_half_open_get (ho_index);
     662             : 
     663             :   /* Unlikely but half-open session and transport could have been freed */
     664          13 :   if (PREDICT_FALSE (!ho))
     665             :     {
     666           0 :       ct_connection_free (cct);
     667           0 :       return;
     668             :     }
     669             : 
     670          13 :   clib_memcpy (cct, ho, sizeof (*ho));
     671          13 :   cct->c_c_index = cct_index;
     672          13 :   cct->c_thread_index = thread_index;
     673          13 :   cct->flags |= CT_CONN_F_HALF_OPEN;
     674             : 
     675             :   /* Notify session layer that half-open is on a different thread
     676             :    * and mark ho connection index reusable. Avoids another rpc
     677             :    */
     678          13 :   session_half_open_migrate_notify (&cct->connection);
     679          13 :   session_half_open_migrated_notify (&cct->connection);
     680          13 :   ct_half_open_add_reusable (ho_index);
     681             : 
     682             :   /*
     683             :    * Alloc and init server transport
     684             :    */
     685             : 
     686          13 :   ll_index = cct->peer_index;
     687          13 :   ll = listen_session_get (ll_index);
     688          13 :   sct = ct_connection_alloc (thread_index);
     689             :   /* Transport not necessarily ct but it might, so grab after sct alloc */
     690          13 :   ll_ct = listen_session_get_transport (ll);
     691             : 
     692             :   /* Make sure cct is valid after sct alloc */
     693          13 :   cct = ct_connection_get (cct_index, thread_index);
     694             : 
     695          13 :   sct->c_rmt_port = 0;
     696          13 :   sct->c_lcl_port = ll_ct->lcl_port;
     697          13 :   sct->c_is_ip4 = cct->c_is_ip4;
     698          13 :   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
     699          13 :   sct->client_wrk = cct->client_wrk;
     700          13 :   sct->c_proto = TRANSPORT_PROTO_NONE;
     701          13 :   sct->client_opaque = cct->client_opaque;
     702          13 :   sct->actual_tp = cct->actual_tp;
     703             : 
     704          13 :   sct->peer_index = cct->c_c_index;
     705          13 :   cct->peer_index = sct->c_c_index;
     706             : 
     707             :   /*
     708             :    * Accept server session. Client session is created only after
     709             :    * server confirms accept.
     710             :    */
     711          13 :   ss = session_alloc (thread_index);
     712          13 :   ll = listen_session_get (ll_index);
     713          26 :   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
     714          13 :                                                      sct->c_is_ip4);
     715          13 :   ss->connection_index = sct->c_c_index;
     716          13 :   ss->listener_handle = listen_session_get_handle (ll);
     717          13 :   session_set_state (ss, SESSION_STATE_CREATED);
     718             : 
     719          13 :   server_wrk = application_listener_select_worker (ll);
     720          13 :   ss->app_wrk_index = server_wrk->wrk_index;
     721             : 
     722          13 :   sct->c_s_index = ss->session_index;
     723          13 :   sct->server_wrk = ss->app_wrk_index;
     724             : 
     725          13 :   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
     726             :     {
     727           0 :       ct_session_connect_notify (ss, SESSION_E_ALLOC);
     728           0 :       ct_connection_free (sct);
     729           0 :       session_free (ss);
     730           0 :       return;
     731             :     }
     732             : 
     733          13 :   cct->server_wrk = sct->server_wrk;
     734          13 :   cct->seg_ctx_index = sct->seg_ctx_index;
     735          13 :   cct->ct_seg_index = sct->ct_seg_index;
     736          13 :   cct->client_rx_fifo = ss->tx_fifo;
     737          13 :   cct->client_tx_fifo = ss->rx_fifo;
     738          13 :   cct->client_rx_fifo->refcnt++;
     739          13 :   cct->client_tx_fifo->refcnt++;
     740          13 :   cct->segment_handle = sct->segment_handle;
     741             : 
     742          13 :   session_set_state (ss, SESSION_STATE_ACCEPTING);
     743          13 :   if (app_worker_accept_notify (server_wrk, ss))
     744             :     {
     745           0 :       ct_session_connect_notify (ss, SESSION_E_REFUSED);
     746           0 :       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
     747           0 :       ct_connection_free (sct);
     748           0 :       session_free (ss);
     749             :     }
     750             : }
     751             : 
     752             : static void
     753          13 : ct_accept_rpc_wrk_handler (void *rpc_args)
     754             : {
     755             :   u32 thread_index, n_connects, i, n_pending;
     756          13 :   const u32 max_connects = 32;
     757             :   ct_worker_t *wrk;
     758          13 :   u8 need_rpc = 0;
     759             : 
     760          13 :   thread_index = pointer_to_uword (rpc_args);
     761          13 :   wrk = ct_worker_get (thread_index);
     762             : 
     763             :   /* Connects could be handled without worker barrier so grab lock */
     764          13 :   clib_spinlock_lock (&wrk->pending_connects_lock);
     765             : 
     766          13 :   n_pending = clib_fifo_elts (wrk->pending_connects);
     767          13 :   n_connects = clib_min (n_pending, max_connects);
     768          13 :   vec_validate (wrk->new_connects, n_connects);
     769             : 
     770          26 :   for (i = 0; i < n_connects; i++)
     771          13 :     clib_fifo_sub1 (wrk->pending_connects, wrk->new_connects[i]);
     772             : 
     773          13 :   if (n_pending == n_connects)
     774          13 :     wrk->have_connects = 0;
     775             :   else
     776           0 :     need_rpc = 1;
     777             : 
     778          13 :   clib_spinlock_unlock (&wrk->pending_connects_lock);
     779             : 
     780          26 :   for (i = 0; i < n_connects; i++)
     781          13 :     ct_accept_one (thread_index, wrk->new_connects[i]);
     782             : 
     783          13 :   if (need_rpc)
     784           0 :     session_send_rpc_evt_to_thread_force (
     785             :       thread_index, ct_accept_rpc_wrk_handler,
     786           0 :       uword_to_pointer (thread_index, void *));
     787          13 : }
     788             : 
     789             : static void
     790          13 : ct_fwrk_flush_connects (void *rpc_args)
     791             : {
     792             :   u32 thread_index, fwrk_index, n_workers;
     793          13 :   ct_main_t *cm = &ct_main;
     794             :   ct_worker_t *wrk;
     795             :   u8 need_rpc;
     796             : 
     797          13 :   fwrk_index = cm->fwrk_thread;
     798          13 :   n_workers = vec_len (cm->fwrk_pending_connects);
     799             : 
     800          26 :   for (thread_index = fwrk_index; thread_index < n_workers; thread_index++)
     801             :     {
     802          13 :       if (!vec_len (cm->fwrk_pending_connects[thread_index]))
     803           0 :         continue;
     804             : 
     805          13 :       wrk = ct_worker_get (thread_index);
     806             : 
     807             :       /* Connects can be done without worker barrier, grab dst worker lock */
     808          13 :       if (thread_index != fwrk_index)
     809           0 :         clib_spinlock_lock (&wrk->pending_connects_lock);
     810             : 
     811          13 :       clib_fifo_add (wrk->pending_connects,
     812             :                      cm->fwrk_pending_connects[thread_index],
     813             :                      vec_len (cm->fwrk_pending_connects[thread_index]));
     814          13 :       if (!wrk->have_connects)
     815             :         {
     816          13 :           wrk->have_connects = 1;
     817          13 :           need_rpc = 1;
     818             :         }
     819             : 
     820          13 :       if (thread_index != fwrk_index)
     821           0 :         clib_spinlock_unlock (&wrk->pending_connects_lock);
     822             : 
     823          13 :       vec_reset_length (cm->fwrk_pending_connects[thread_index]);
     824             : 
     825          13 :       if (need_rpc)
     826          13 :         session_send_rpc_evt_to_thread_force (
     827             :           thread_index, ct_accept_rpc_wrk_handler,
     828          13 :           uword_to_pointer (thread_index, void *));
     829             :     }
     830             : 
     831          13 :   cm->fwrk_have_flush = 0;
     832          13 : }
     833             : 
     834             : static void
     835          13 : ct_program_connect_to_wrk (u32 ho_index)
     836             : {
     837          13 :   ct_main_t *cm = &ct_main;
     838             :   u32 thread_index;
     839             : 
     840             :   /* Simple round-robin policy for spreading sessions over workers. We skip
     841             :    * thread index 0, i.e., offset the index by 1, when we have workers as it
     842             :    * is the one dedicated to main thread. Note that n_workers does not include
     843             :    * main thread */
     844          13 :   cm->n_sessions += 1;
     845          13 :   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
     846             : 
     847             :   /* Pospone flushing of connect request to dst worker until after session
     848             :    * layer fully initializes the half-open session. */
     849          13 :   vec_add1 (cm->fwrk_pending_connects[thread_index], ho_index);
     850          13 :   if (!cm->fwrk_have_flush)
     851             :     {
     852          13 :       session_send_rpc_evt_to_thread_force (
     853             :         cm->fwrk_thread, ct_fwrk_flush_connects,
     854          13 :         uword_to_pointer (thread_index, void *));
     855          13 :       cm->fwrk_have_flush = 1;
     856             :     }
     857          13 : }
     858             : 
     859             : static int
     860          13 : ct_connect (app_worker_t *client_wrk, session_t *ll,
     861             :             session_endpoint_cfg_t *sep)
     862             : {
     863             :   ct_connection_t *ho;
     864             :   u32 ho_index;
     865             : 
     866             :   /*
     867             :    * Alloc and init client half-open transport
     868             :    */
     869             : 
     870          13 :   ho = ct_half_open_alloc ();
     871          13 :   ho_index = ho->c_c_index;
     872          13 :   ho->c_rmt_port = sep->port;
     873          13 :   ho->c_lcl_port = 0;
     874          13 :   ho->c_is_ip4 = sep->is_ip4;
     875          13 :   ho->client_opaque = sep->opaque;
     876          13 :   ho->client_wrk = client_wrk->wrk_index;
     877          13 :   ho->peer_index = ll->session_index;
     878          13 :   ho->c_proto = TRANSPORT_PROTO_NONE;
     879          13 :   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
     880          13 :   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
     881          13 :   ho->flags |= CT_CONN_F_CLIENT;
     882          13 :   ho->c_s_index = ~0;
     883          13 :   ho->actual_tp = sep->original_tp;
     884             : 
     885             :   /*
     886             :    * Program connect on a worker, connected reply comes
     887             :    * after server accepts the connection.
     888             :    */
     889          13 :   ct_program_connect_to_wrk (ho_index);
     890             : 
     891          13 :   return ho_index;
     892             : }
     893             : 
     894             : static u32
     895          18 : ct_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
     896             : {
     897             :   session_endpoint_cfg_t *sep;
     898             :   ct_connection_t *ct;
     899             : 
     900          18 :   sep = (session_endpoint_cfg_t *) tep;
     901          18 :   ct = ct_connection_alloc (0);
     902          18 :   ct->server_wrk = sep->app_wrk_index;
     903          18 :   ct->c_is_ip4 = sep->is_ip4;
     904          18 :   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
     905          18 :   ct->c_lcl_port = sep->port;
     906          18 :   ct->c_s_index = app_listener_index;
     907          18 :   ct->actual_tp = sep->transport_proto;
     908          18 :   return ct->c_c_index;
     909             : }
     910             : 
     911             : static u32
     912          18 : ct_stop_listen (u32 ct_index)
     913             : {
     914             :   ct_connection_t *ct;
     915          18 :   ct = ct_connection_get (ct_index, 0);
     916          18 :   ct_connection_free (ct);
     917          18 :   return 0;
     918             : }
     919             : 
     920             : static transport_connection_t *
     921          79 : ct_listener_get (u32 ct_index)
     922             : {
     923          79 :   return (transport_connection_t *) ct_connection_get (ct_index, 0);
     924             : }
     925             : 
     926             : static transport_connection_t *
     927          13 : ct_session_half_open_get (u32 ct_index)
     928             : {
     929          13 :   return (transport_connection_t *) ct_half_open_get (ct_index);
     930             : }
     931             : 
     932             : static void
     933           1 : ct_session_cleanup (u32 conn_index, u32 thread_index)
     934             : {
     935             :   ct_connection_t *ct, *peer_ct;
     936             : 
     937           1 :   ct = ct_connection_get (conn_index, thread_index);
     938           1 :   if (!ct)
     939           0 :     return;
     940             : 
     941           1 :   peer_ct = ct_connection_get (ct->peer_index, thread_index);
     942           1 :   if (peer_ct)
     943           1 :     peer_ct->peer_index = ~0;
     944             : 
     945           1 :   ct_connection_free (ct);
     946             : }
     947             : 
     948             : static void
     949           0 : ct_cleanup_ho (u32 ho_index)
     950             : {
     951             :   ct_connection_t *ho;
     952             : 
     953           0 :   ho = ct_half_open_get (ho_index);
     954           0 :   ct_connection_free (ho);
     955           0 : }
     956             : 
     957             : static int
     958          17 : ct_session_connect (transport_endpoint_cfg_t * tep)
     959             : {
     960             :   session_endpoint_cfg_t *sep_ext;
     961          17 :   session_endpoint_t _sep, *sep = &_sep;
     962             :   app_worker_t *app_wrk;
     963             :   session_handle_t lh;
     964             :   application_t *app;
     965             :   app_listener_t *al;
     966             :   u32 table_index;
     967             :   session_t *ll;
     968             :   u8 fib_proto;
     969             : 
     970          17 :   sep_ext = (session_endpoint_cfg_t *) tep;
     971          17 :   _sep = *(session_endpoint_t *) tep;
     972          17 :   app_wrk = app_worker_get (sep_ext->app_wrk_index);
     973          17 :   app = application_get (app_wrk->app_index);
     974             : 
     975          17 :   sep->transport_proto = sep_ext->original_tp;
     976          17 :   table_index = application_local_session_table (app);
     977          17 :   lh = session_lookup_local_endpoint (table_index, sep);
     978          17 :   if (lh == SESSION_DROP_HANDLE)
     979           3 :     return SESSION_E_FILTERED;
     980             : 
     981          14 :   if (lh == SESSION_INVALID_HANDLE)
     982           1 :     goto global_scope;
     983             : 
     984          13 :   ll = listen_session_get_from_handle (lh);
     985          13 :   al = app_listener_get_w_session (ll);
     986             : 
     987             :   /*
     988             :    * Break loop if rule in local table points to connecting app. This
     989             :    * can happen if client is a generic proxy. Route connect through
     990             :    * global table instead.
     991             :    */
     992          13 :   if (al->app_index == app->app_index)
     993           0 :     goto global_scope;
     994             : 
     995          13 :   return ct_connect (app_wrk, ll, sep_ext);
     996             : 
     997             :   /*
     998             :    * If nothing found, check the global scope for locally attached
     999             :    * destinations. Make sure first that we're allowed to.
    1000             :    */
    1001             : 
    1002           1 : global_scope:
    1003           1 :   if (session_endpoint_is_local (sep))
    1004           1 :     return SESSION_E_NOROUTE;
    1005             : 
    1006           0 :   if (!application_has_global_scope (app))
    1007           0 :     return SESSION_E_SCOPE;
    1008             : 
    1009           0 :   fib_proto = session_endpoint_fib_proto (sep);
    1010           0 :   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
    1011           0 :   ll = session_lookup_listener_wildcard (table_index, sep);
    1012             : 
    1013             :   /* Avoid connecting app to own listener */
    1014           0 :   if (ll && ll->app_index != app->app_index)
    1015           0 :     return ct_connect (app_wrk, ll, sep_ext);
    1016             : 
    1017             :   /* Failed to connect but no error */
    1018           0 :   return SESSION_E_LOCAL_CONNECT;
    1019             : }
    1020             : 
    1021             : static inline int
    1022          12 : ct_close_is_reset (ct_connection_t *ct, session_t *s)
    1023             : {
    1024          12 :   if (ct->flags & CT_CONN_F_CLIENT)
    1025           6 :     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
    1026             :   else
    1027           6 :     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
    1028             : }
    1029             : 
    1030             : static void
    1031          25 : ct_session_postponed_cleanup (ct_connection_t *ct)
    1032             : {
    1033             :   ct_connection_t *peer_ct;
    1034             :   app_worker_t *app_wrk;
    1035             :   session_t *s;
    1036             : 
    1037          25 :   s = session_get (ct->c_s_index, ct->c_thread_index);
    1038          25 :   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
    1039             : 
    1040          25 :   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
    1041          25 :   if (peer_ct)
    1042             :     {
    1043          12 :       if (ct_close_is_reset (ct, s))
    1044           1 :         session_transport_reset_notify (&peer_ct->connection);
    1045             :       else
    1046          11 :         session_transport_closing_notify (&peer_ct->connection);
    1047             :     }
    1048          25 :   session_transport_closed_notify (&ct->connection);
    1049             : 
    1050          25 :   if (ct->flags & CT_CONN_F_CLIENT)
    1051             :     {
    1052          12 :       if (app_wrk)
    1053           8 :         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
    1054             : 
    1055             :       /* Normal free for client session as the fifos are allocated through
    1056             :        * the connects segment manager in a segment that's not shared with
    1057             :        * the server */
    1058          12 :       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
    1059          12 :       session_free_w_fifos (s);
    1060             :     }
    1061             :   else
    1062             :     {
    1063             :       /* Manual session and fifo segment cleanup to avoid implicit
    1064             :        * segment manager cleanups and notifications */
    1065          13 :       app_wrk = app_worker_get_if_valid (s->app_wrk_index);
    1066          13 :       if (app_wrk)
    1067             :         {
    1068          12 :           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
    1069          12 :           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
    1070             :         }
    1071             : 
    1072          13 :       ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
    1073          13 :       session_free (s);
    1074             :     }
    1075             : 
    1076          25 :   ct_connection_free (ct);
    1077          25 : }
    1078             : 
    1079             : static void
    1080          20 : ct_handle_cleanups (void *args)
    1081             : {
    1082          20 :   uword thread_index = pointer_to_uword (args);
    1083          20 :   const u32 max_cleanups = 100;
    1084             :   ct_cleanup_req_t *req;
    1085             :   ct_connection_t *ct;
    1086          20 :   u32 n_to_handle = 0;
    1087             :   ct_worker_t *wrk;
    1088             :   session_t *s;
    1089             : 
    1090          20 :   wrk = ct_worker_get (thread_index);
    1091          20 :   wrk->have_cleanups = 0;
    1092          20 :   n_to_handle = clib_fifo_elts (wrk->pending_cleanups);
    1093          20 :   n_to_handle = clib_min (n_to_handle, max_cleanups);
    1094             : 
    1095          45 :   while (n_to_handle)
    1096             :     {
    1097          25 :       clib_fifo_sub2 (wrk->pending_cleanups, req);
    1098          25 :       ct = ct_connection_get (req->ct_index, thread_index);
    1099          25 :       s = session_get (ct->c_s_index, ct->c_thread_index);
    1100          25 :       if (!svm_fifo_has_event (s->tx_fifo))
    1101          25 :         ct_session_postponed_cleanup (ct);
    1102             :       else
    1103           0 :         clib_fifo_add1 (wrk->pending_cleanups, *req);
    1104          25 :       n_to_handle -= 1;
    1105             :     }
    1106             : 
    1107          20 :   if (clib_fifo_elts (wrk->pending_cleanups))
    1108             :     {
    1109           0 :       wrk->have_cleanups = 1;
    1110           0 :       session_send_rpc_evt_to_thread_force (
    1111             :         thread_index, ct_handle_cleanups,
    1112             :         uword_to_pointer (thread_index, void *));
    1113             :     }
    1114          20 : }
    1115             : 
    1116             : static void
    1117          25 : ct_program_cleanup (ct_connection_t *ct)
    1118             : {
    1119             :   ct_cleanup_req_t *req;
    1120             :   uword thread_index;
    1121             :   ct_worker_t *wrk;
    1122             : 
    1123          25 :   thread_index = ct->c_thread_index;
    1124          25 :   wrk = ct_worker_get (ct->c_thread_index);
    1125             : 
    1126          25 :   clib_fifo_add2 (wrk->pending_cleanups, req);
    1127          25 :   req->ct_index = ct->c_c_index;
    1128             : 
    1129          25 :   if (wrk->have_cleanups)
    1130           5 :     return;
    1131             : 
    1132          20 :   wrk->have_cleanups = 1;
    1133          20 :   session_send_rpc_evt_to_thread_force (
    1134             :     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
    1135             : }
    1136             : 
    1137             : static void
    1138          25 : ct_session_close (u32 ct_index, u32 thread_index)
    1139             : {
    1140             :   ct_connection_t *ct, *peer_ct;
    1141             :   session_t *s;
    1142             : 
    1143          25 :   ct = ct_connection_get (ct_index, thread_index);
    1144          25 :   s = session_get (ct->c_s_index, ct->c_thread_index);
    1145          25 :   peer_ct = ct_connection_get (ct->peer_index, thread_index);
    1146          25 :   if (peer_ct)
    1147             :     {
    1148          12 :       peer_ct->peer_index = ~0;
    1149             :       /* Make sure session was allocated */
    1150          12 :       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
    1151             :         {
    1152           0 :           ct_session_connect_notify (s, SESSION_E_REFUSED);
    1153           0 :           ct->peer_index = ~0;
    1154             :         }
    1155          12 :       else if (peer_ct->c_s_index == ~0)
    1156             :         {
    1157             :           /* should not happen */
    1158           0 :           clib_warning ("ct peer without session");
    1159           0 :           ct_connection_free (peer_ct);
    1160             :         }
    1161             :     }
    1162             : 
    1163             :   /* Do not send closed notify to make sure pending tx events are
    1164             :    * still delivered and program cleanup */
    1165          25 :   ct_program_cleanup (ct);
    1166          25 : }
    1167             : 
    1168             : static transport_connection_t *
    1169      374580 : ct_session_get (u32 ct_index, u32 thread_index)
    1170             : {
    1171      374580 :   return (transport_connection_t *) ct_connection_get (ct_index,
    1172             :                                                        thread_index);
    1173             : }
    1174             : 
    1175             : static u8 *
    1176           0 : format_ct_connection_id (u8 * s, va_list * args)
    1177             : {
    1178           0 :   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
    1179           0 :   if (!ct)
    1180           0 :     return s;
    1181           0 :   if (ct->c_is_ip4)
    1182             :     {
    1183           0 :       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
    1184           0 :                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
    1185             :                   format_ip4_address, &ct->c_lcl_ip4,
    1186           0 :                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
    1187           0 :                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
    1188             :     }
    1189             :   else
    1190             :     {
    1191           0 :       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
    1192           0 :                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
    1193             :                   format_ip6_address, &ct->c_lcl_ip6,
    1194           0 :                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
    1195           0 :                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
    1196             :     }
    1197             : 
    1198           0 :   return s;
    1199             : }
    1200             : 
    1201             : static int
    1202      124753 : ct_custom_tx (void *session, transport_send_params_t * sp)
    1203             : {
    1204      124753 :   session_t *s = (session_t *) session;
    1205      124753 :   if (session_has_transport (s))
    1206           0 :     return 0;
    1207             :   /* If event enqueued towards peer, remove from scheduler and remove
    1208             :    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
    1209             :    * avoid missing events if peer did not clear fifo flag yet, which is
    1210             :    * interpreted as successful notification and session is descheduled. */
    1211      124753 :   svm_fifo_unset_event (s->tx_fifo);
    1212      124753 :   if (!ct_session_tx (s))
    1213      124753 :     sp->flags = TRANSPORT_SND_F_DESCHED;
    1214             : 
    1215             :   /* The scheduler uses packet count as a means of upper bounding the amount
    1216             :    * of work done per dispatch. So make it look like we have sent something */
    1217      124753 :   return 1;
    1218             : }
    1219             : 
    1220             : static int
    1221          13 : ct_app_rx_evt (transport_connection_t * tc)
    1222             : {
    1223          13 :   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
    1224             :   session_t *ps, *s;
    1225             : 
    1226          13 :   s = session_get (ct->c_s_index, ct->c_thread_index);
    1227          13 :   if (session_has_transport (s) || s->session_state < SESSION_STATE_READY)
    1228           0 :     return -1;
    1229          13 :   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
    1230          13 :   if (!peer_ct || (peer_ct->flags & CT_CONN_F_HALF_OPEN))
    1231           0 :     return -1;
    1232          13 :   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
    1233          13 :   if (ps->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
    1234           0 :     return -1;
    1235          13 :   return session_dequeue_notify (ps);
    1236             : }
    1237             : 
    1238             : static u8 *
    1239           0 : format_ct_listener (u8 * s, va_list * args)
    1240             : {
    1241           0 :   u32 tc_index = va_arg (*args, u32);
    1242           0 :   u32 __clib_unused thread_index = va_arg (*args, u32);
    1243           0 :   u32 __clib_unused verbose = va_arg (*args, u32);
    1244           0 :   ct_connection_t *ct = ct_connection_get (tc_index, 0);
    1245           0 :   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
    1246           0 :   if (verbose)
    1247           0 :     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
    1248           0 :   return s;
    1249             : }
    1250             : 
    1251             : static u8 *
    1252           0 : format_ct_half_open (u8 *s, va_list *args)
    1253             : {
    1254           0 :   u32 ho_index = va_arg (*args, u32);
    1255           0 :   u32 verbose = va_arg (*args, u32);
    1256           0 :   ct_connection_t *ct = ct_half_open_get (ho_index);
    1257           0 :   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
    1258           0 :   if (verbose)
    1259           0 :     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
    1260           0 :   return s;
    1261             : }
    1262             : 
    1263             : static u8 *
    1264           0 : format_ct_connection (u8 * s, va_list * args)
    1265             : {
    1266           0 :   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
    1267           0 :   u32 verbose = va_arg (*args, u32);
    1268             : 
    1269           0 :   if (!ct)
    1270           0 :     return s;
    1271           0 :   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
    1272           0 :   if (verbose)
    1273             :     {
    1274           0 :       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
    1275           0 :       if (verbose > 1)
    1276             :         {
    1277           0 :           s = format (s, "\n");
    1278             :         }
    1279             :     }
    1280           0 :   return s;
    1281             : }
    1282             : 
    1283             : static u8 *
    1284           0 : format_ct_session (u8 * s, va_list * args)
    1285             : {
    1286           0 :   u32 ct_index = va_arg (*args, u32);
    1287           0 :   u32 thread_index = va_arg (*args, u32);
    1288           0 :   u32 verbose = va_arg (*args, u32);
    1289             :   ct_connection_t *ct;
    1290             : 
    1291           0 :   ct = ct_connection_get (ct_index, thread_index);
    1292           0 :   if (!ct)
    1293             :     {
    1294           0 :       s = format (s, "empty\n");
    1295           0 :       return s;
    1296             :     }
    1297             : 
    1298           0 :   s = format (s, "%U", format_ct_connection, ct, verbose);
    1299           0 :   return s;
    1300             : }
    1301             : 
    1302             : clib_error_t *
    1303          57 : ct_enable_disable (vlib_main_t * vm, u8 is_en)
    1304             : {
    1305          57 :   vlib_thread_main_t *vtm = &vlib_thread_main;
    1306          57 :   ct_main_t *cm = &ct_main;
    1307             :   ct_worker_t *wrk;
    1308             : 
    1309          57 :   cm->n_workers = vlib_num_workers ();
    1310          57 :   cm->fwrk_thread = transport_cl_thread ();
    1311          57 :   vec_validate (cm->wrk, vtm->n_vlib_mains);
    1312         192 :   vec_foreach (wrk, cm->wrk)
    1313         135 :     clib_spinlock_init (&wrk->pending_connects_lock);
    1314          57 :   clib_spinlock_init (&cm->ho_reuseable_lock);
    1315          57 :   clib_rwlock_init (&cm->app_segs_lock);
    1316          57 :   vec_validate (cm->fwrk_pending_connects, cm->n_workers);
    1317          57 :   return 0;
    1318             : }
    1319             : 
    1320             : /* *INDENT-OFF* */
    1321             : static const transport_proto_vft_t cut_thru_proto = {
    1322             :   .enable = ct_enable_disable,
    1323             :   .start_listen = ct_start_listen,
    1324             :   .stop_listen = ct_stop_listen,
    1325             :   .get_connection = ct_session_get,
    1326             :   .get_listener = ct_listener_get,
    1327             :   .get_half_open = ct_session_half_open_get,
    1328             :   .cleanup = ct_session_cleanup,
    1329             :   .cleanup_ho = ct_cleanup_ho,
    1330             :   .connect = ct_session_connect,
    1331             :   .close = ct_session_close,
    1332             :   .custom_tx = ct_custom_tx,
    1333             :   .app_rx_evt = ct_app_rx_evt,
    1334             :   .format_listener = format_ct_listener,
    1335             :   .format_half_open = format_ct_half_open,
    1336             :   .format_connection = format_ct_session,
    1337             :   .transport_options = {
    1338             :     .name = "ct",
    1339             :     .short_name = "C",
    1340             :     .tx_type = TRANSPORT_TX_INTERNAL,
    1341             :     .service_type = TRANSPORT_SERVICE_VC,
    1342             :   },
    1343             : };
    1344             : /* *INDENT-ON* */
    1345             : 
    1346             : static inline int
    1347      124753 : ct_session_can_tx (session_t *s)
    1348             : {
    1349      124755 :   return (s->session_state == SESSION_STATE_READY ||
    1350      124753 :           s->session_state == SESSION_STATE_CLOSING ||
    1351           0 :           s->session_state == SESSION_STATE_APP_CLOSED);
    1352             : }
    1353             : 
    1354             : int
    1355      124753 : ct_session_tx (session_t * s)
    1356             : {
    1357             :   ct_connection_t *ct, *peer_ct;
    1358             :   session_t *peer_s;
    1359             : 
    1360      124753 :   if (!ct_session_can_tx (s))
    1361           0 :     return 0;
    1362      124753 :   ct = (ct_connection_t *) session_get_transport (s);
    1363      124753 :   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
    1364      124753 :   if (!peer_ct)
    1365           0 :     return 0;
    1366      124753 :   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
    1367      124753 :   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
    1368           2 :     return 0;
    1369      124751 :   return session_enqueue_notify (peer_s);
    1370             : }
    1371             : 
    1372             : static clib_error_t *
    1373         559 : ct_transport_init (vlib_main_t * vm)
    1374             : {
    1375         559 :   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
    1376             :                                FIB_PROTOCOL_IP4, ~0);
    1377         559 :   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
    1378             :                                FIB_PROTOCOL_IP6, ~0);
    1379         559 :   return 0;
    1380             : }
    1381             : 
    1382       77279 : VLIB_INIT_FUNCTION (ct_transport_init);
    1383             : 
    1384             : /*
    1385             :  * fd.io coding-style-patch-verification: ON
    1386             :  *
    1387             :  * Local Variables:
    1388             :  * eval: (c-set-style "gnu")
    1389             :  * End:
    1390             :  */

Generated by: LCOV version 1.14