Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0
2 : * Copyright(c) 2023 Cisco Systems, Inc.
3 : */
4 :
5 : #include <vnet/session/session.h>
6 : #include <vnet/session/application.h>
7 :
8 : static inline int
9 187297 : mq_try_lock (svm_msg_q_t *mq)
10 : {
11 187297 : int rv, n_try = 0;
12 :
13 187297 : while (n_try < 100)
14 : {
15 187297 : rv = svm_msg_q_try_lock (mq);
16 187297 : if (!rv)
17 187297 : return 0;
18 0 : n_try += 1;
19 0 : usleep (1);
20 : }
21 :
22 0 : return -1;
23 : }
24 :
25 : always_inline u8
26 200258 : mq_event_ring_index (session_evt_type_t et)
27 : {
28 200258 : return (et >= SESSION_CTRL_EVT_RPC ? SESSION_MQ_CTRL_EVT_RING :
29 : SESSION_MQ_IO_EVT_RING);
30 : }
31 :
32 : void
33 87 : app_worker_del_all_events (app_worker_t *app_wrk)
34 : {
35 : session_worker_t *wrk;
36 : session_event_t *evt;
37 : u32 thread_index;
38 : session_t *s;
39 :
40 178 : for (thread_index = 0; thread_index < vec_len (app_wrk->wrk_evts);
41 91 : thread_index++)
42 : {
43 102 : while (clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
44 : {
45 11 : clib_fifo_sub2 (app_wrk->wrk_evts[thread_index], evt);
46 11 : switch (evt->event_type)
47 : {
48 0 : case SESSION_CTRL_EVT_MIGRATED:
49 0 : s = session_get (evt->session_index, thread_index);
50 0 : transport_cleanup (session_get_transport_proto (s),
51 0 : s->connection_index, s->thread_index);
52 0 : session_free (s);
53 0 : break;
54 4 : case SESSION_CTRL_EVT_CLEANUP:
55 4 : s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
56 4 : if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
57 2 : break;
58 2 : uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
59 2 : break;
60 0 : case SESSION_CTRL_EVT_HALF_CLEANUP:
61 0 : s = ho_session_get (evt->session_index);
62 0 : pool_put_index (app_wrk->half_open_table, s->ho_index);
63 0 : session_free (s);
64 0 : break;
65 7 : default:
66 7 : break;
67 : }
68 : }
69 91 : wrk = session_main_get_worker (thread_index);
70 91 : clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
71 : }
72 87 : }
73 :
74 : always_inline int
75 216329 : app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index,
76 : u8 is_builtin)
77 : {
78 216329 : application_t *app = application_get (app_wrk->app_index);
79 216329 : svm_msg_q_t *mq = app_wrk->event_queue;
80 : u8 ring_index, mq_is_cong;
81 : session_state_t old_state;
82 : session_event_t *evt;
83 216329 : u32 n_evts = 128, i;
84 : session_t *s;
85 : int rv;
86 :
87 216329 : n_evts = clib_min (n_evts, clib_fifo_elts (app_wrk->wrk_evts[thread_index]));
88 :
89 216329 : if (!is_builtin)
90 : {
91 187297 : mq_is_cong = app_worker_mq_is_congested (app_wrk);
92 187297 : if (mq_try_lock (mq))
93 : {
94 0 : app_worker_set_mq_wrk_congested (app_wrk, thread_index);
95 0 : return 0;
96 : }
97 : }
98 :
99 490474 : for (i = 0; i < n_evts; i++)
100 : {
101 274145 : evt = clib_fifo_head (app_wrk->wrk_evts[thread_index]);
102 274145 : if (!is_builtin)
103 : {
104 200258 : ring_index = mq_event_ring_index (evt->event_type);
105 200258 : if (svm_msg_q_or_ring_is_full (mq, ring_index))
106 : {
107 0 : app_worker_set_mq_wrk_congested (app_wrk, thread_index);
108 0 : break;
109 : }
110 : }
111 :
112 274145 : switch (evt->event_type)
113 : {
114 245185 : case SESSION_IO_EVT_RX:
115 245185 : s = session_get (evt->session_index, thread_index);
116 : /* Application didn't confirm accept yet */
117 245185 : if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING ||
118 : s->session_state == SESSION_STATE_CONNECTING))
119 : break;
120 245172 : s->flags &= ~SESSION_F_RX_EVT;
121 245172 : app->cb_fns.builtin_app_rx_callback (s);
122 245172 : break;
123 : /* Handle sessions that might not be on current thread */
124 0 : case SESSION_IO_EVT_BUILTIN_RX:
125 0 : s = session_get_from_handle_if_valid (evt->session_handle);
126 0 : if (!s || s->session_state == SESSION_STATE_ACCEPTING ||
127 0 : s->session_state == SESSION_STATE_CONNECTING)
128 : break;
129 0 : s->flags &= ~SESSION_F_RX_EVT;
130 0 : app->cb_fns.builtin_app_rx_callback (s);
131 0 : break;
132 27390 : case SESSION_IO_EVT_TX:
133 27390 : s = session_get (evt->session_index, thread_index);
134 27390 : app->cb_fns.builtin_app_tx_callback (s);
135 27390 : break;
136 0 : case SESSION_IO_EVT_TX_MAIN:
137 0 : s = session_get_from_handle_if_valid (evt->session_handle);
138 0 : if (!s)
139 0 : break;
140 0 : app->cb_fns.builtin_app_tx_callback (s);
141 0 : break;
142 43 : case SESSION_CTRL_EVT_BOUND:
143 : /* No app cb function currently */
144 43 : if (is_builtin)
145 0 : break;
146 43 : mq_send_session_bound_cb (app_wrk->wrk_index, evt->as_u64[1] >> 32,
147 : evt->session_handle,
148 43 : evt->as_u64[1] & 0xffffffff);
149 43 : break;
150 204 : case SESSION_CTRL_EVT_ACCEPTED:
151 204 : s = session_get (evt->session_index, thread_index);
152 204 : old_state = s->session_state;
153 204 : if (app->cb_fns.session_accept_callback (s))
154 : {
155 0 : session_close (s);
156 0 : s->app_wrk_index = SESSION_INVALID_INDEX;
157 0 : break;
158 : }
159 204 : if (is_builtin)
160 : {
161 157 : if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
162 : {
163 0 : session_set_state (s, old_state);
164 0 : app_worker_close_notify (app_wrk, s);
165 : }
166 : }
167 204 : break;
168 207 : case SESSION_CTRL_EVT_CONNECTED:
169 207 : if (!(evt->as_u64[1] & 0xffffffff))
170 : {
171 207 : s = session_get (evt->session_index, thread_index);
172 207 : old_state = s->session_state;
173 : }
174 : else
175 0 : s = 0;
176 207 : rv = app->cb_fns.session_connected_callback (
177 207 : app_wrk->wrk_index, evt->as_u64[1] >> 32, s,
178 207 : evt->as_u64[1] & 0xffffffff);
179 207 : if (!s)
180 0 : break;
181 207 : if (rv)
182 : {
183 0 : session_close (s);
184 0 : s->app_wrk_index = SESSION_INVALID_INDEX;
185 0 : break;
186 : }
187 207 : if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
188 : {
189 0 : session_set_state (s, old_state);
190 0 : app_worker_close_notify (app_wrk, s);
191 : }
192 207 : break;
193 169 : case SESSION_CTRL_EVT_DISCONNECTED:
194 169 : s = session_get (evt->session_index, thread_index);
195 169 : app->cb_fns.session_disconnect_callback (s);
196 169 : break;
197 6 : case SESSION_CTRL_EVT_RESET:
198 6 : s = session_get (evt->session_index, thread_index);
199 6 : app->cb_fns.session_reset_callback (s);
200 6 : break;
201 18 : case SESSION_CTRL_EVT_UNLISTEN_REPLY:
202 18 : if (is_builtin)
203 0 : break;
204 18 : mq_send_unlisten_reply (app_wrk, evt->session_handle,
205 18 : evt->as_u64[1] >> 32,
206 18 : evt->as_u64[1] & 0xffffffff);
207 18 : break;
208 0 : case SESSION_CTRL_EVT_MIGRATED:
209 0 : s = session_get (evt->session_index, thread_index);
210 0 : app->cb_fns.session_migrate_callback (s, evt->as_u64[1]);
211 0 : transport_cleanup (session_get_transport_proto (s),
212 0 : s->connection_index, s->thread_index);
213 0 : session_free (s);
214 : /* Notify app that it has data on the new session */
215 0 : s = session_get_from_handle (evt->as_u64[1]);
216 0 : session_send_io_evt_to_thread (s->rx_fifo,
217 : SESSION_IO_EVT_BUILTIN_RX);
218 0 : break;
219 262 : case SESSION_CTRL_EVT_TRANSPORT_CLOSED:
220 262 : s = session_get (evt->session_index, thread_index);
221 262 : if (app->cb_fns.session_transport_closed_callback)
222 0 : app->cb_fns.session_transport_closed_callback (s);
223 262 : break;
224 420 : case SESSION_CTRL_EVT_CLEANUP:
225 420 : s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
226 420 : if (app->cb_fns.session_cleanup_callback)
227 154 : app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32);
228 420 : if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
229 210 : break;
230 210 : uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
231 210 : break;
232 147 : case SESSION_CTRL_EVT_HALF_CLEANUP:
233 147 : s = ho_session_get (evt->session_index);
234 147 : ASSERT (session_vlib_thread_is_cl_thread ());
235 147 : if (app->cb_fns.half_open_cleanup_callback)
236 2 : app->cb_fns.half_open_cleanup_callback (s);
237 147 : pool_put_index (app_wrk->half_open_table, s->ho_index);
238 147 : session_free (s);
239 147 : break;
240 88 : case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
241 88 : app->cb_fns.add_segment_callback (app_wrk->wrk_index,
242 : evt->as_u64[1]);
243 88 : break;
244 6 : case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
245 6 : app->cb_fns.del_segment_callback (app_wrk->wrk_index,
246 : evt->as_u64[1]);
247 6 : break;
248 0 : default:
249 0 : clib_warning ("unexpected event: %u", evt->event_type);
250 0 : ASSERT (0);
251 0 : break;
252 : }
253 274145 : clib_fifo_advance_head (app_wrk->wrk_evts[thread_index], 1);
254 : }
255 :
256 216329 : if (!is_builtin)
257 : {
258 187297 : svm_msg_q_unlock (mq);
259 187297 : if (mq_is_cong && i == n_evts)
260 0 : app_worker_unset_wrk_mq_congested (app_wrk, thread_index);
261 : }
262 :
263 216329 : return 0;
264 : }
265 :
266 : int
267 216329 : app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index)
268 : {
269 216329 : if (app_worker_application_is_builtin (app_wrk))
270 29032 : return app_worker_flush_events_inline (app_wrk, thread_index,
271 : 1 /* is_builtin */);
272 : else
273 187297 : return app_worker_flush_events_inline (app_wrk, thread_index,
274 : 0 /* is_builtin */);
275 : }
276 :
277 : static inline int
278 188207 : session_wrk_flush_events (session_worker_t *wrk)
279 : {
280 : app_worker_t *app_wrk;
281 : uword app_wrk_index;
282 : u32 thread_index;
283 :
284 188207 : thread_index = wrk->vm->thread_index;
285 188207 : app_wrk_index = clib_bitmap_first_set (wrk->app_wrks_pending_ntf);
286 :
287 404493 : while (app_wrk_index != ~0)
288 : {
289 216286 : app_wrk = app_worker_get_if_valid (app_wrk_index);
290 : /* app_wrk events are flushed on free, so should be valid here */
291 216286 : ASSERT (app_wrk != 0);
292 216286 : app_wrk_flush_wrk_events (app_wrk, thread_index);
293 :
294 216286 : if (!clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
295 215747 : clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
296 :
297 : app_wrk_index =
298 216286 : clib_bitmap_next_set (wrk->app_wrks_pending_ntf, app_wrk_index + 1);
299 : }
300 :
301 188207 : if (!clib_bitmap_is_zero (wrk->app_wrks_pending_ntf))
302 539 : vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
303 :
304 188207 : return 0;
305 : }
306 :
307 188782 : VLIB_NODE_FN (session_input_node)
308 : (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
309 : {
310 188207 : u32 thread_index = vm->thread_index;
311 : session_worker_t *wrk;
312 :
313 188207 : wrk = session_main_get_worker (thread_index);
314 188207 : session_wrk_flush_events (wrk);
315 :
316 188207 : return 0;
317 : }
318 :
319 183788 : VLIB_REGISTER_NODE (session_input_node) = {
320 : .name = "session-input",
321 : .type = VLIB_NODE_TYPE_INPUT,
322 : .state = VLIB_NODE_STATE_DISABLED,
323 : };
324 :
325 : /*
326 : * fd.io coding-style-patch-verification: ON
327 : *
328 : * Local Variables:
329 : * eval: (c-set-style "gnu")
330 : * End:
331 : */
|