Line data Source code
1 : /*
2 : * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 :
16 : #include <math.h>
17 : #include <vlib/vlib.h>
18 : #include <vnet/vnet.h>
19 : #include <vppinfra/elog.h>
20 : #include <vnet/session/transport.h>
21 : #include <vnet/session/session.h>
22 : #include <vnet/session/application.h>
23 : #include <vnet/session/application_interface.h>
24 : #include <vnet/session/application_local.h>
25 : #include <vnet/session/session_debug.h>
26 : #include <svm/queue.h>
27 : #include <sys/timerfd.h>
28 :
29 : static inline void
30 4 : session_wrk_send_evt_to_main (session_worker_t *wrk, session_evt_elt_t *elt)
31 : {
32 : session_evt_elt_t *he;
33 : uword thread_index;
34 : u8 is_empty;
35 :
36 4 : thread_index = wrk->vm->thread_index;
37 4 : he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
38 4 : is_empty = clib_llist_is_empty (wrk->event_elts, evt_list, he);
39 4 : clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
40 4 : if (is_empty)
41 4 : session_send_rpc_evt_to_thread (0, session_wrk_handle_evts_main_rpc,
42 : uword_to_pointer (thread_index, void *));
43 4 : }
44 :
45 : #define app_check_thread_and_barrier(_wrk, _elt) \
46 : if (!vlib_thread_is_main_w_barrier ()) \
47 : { \
48 : session_wrk_send_evt_to_main (wrk, elt); \
49 : return; \
50 : }
51 :
52 : static void
53 28 : session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
54 : {
55 : struct itimerspec its;
56 :
57 28 : its.it_value.tv_sec = 0;
58 28 : its.it_value.tv_nsec = time_ns;
59 28 : its.it_interval.tv_sec = 0;
60 28 : its.it_interval.tv_nsec = its.it_value.tv_nsec;
61 :
62 28 : if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
63 0 : clib_warning ("timerfd_settime");
64 28 : }
65 :
66 : always_inline u64
67 28 : session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
68 : {
69 28 : if (state == SESSION_WRK_INTERRUPT)
70 15 : return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
71 13 : else if (state == SESSION_WRK_IDLE)
72 4 : return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
73 : else
74 9 : return 0;
75 : }
76 :
77 : static inline void
78 28 : session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
79 : {
80 : u64 time_ns;
81 :
82 28 : wrk->state = state;
83 28 : if (wrk->timerfd == -1)
84 0 : return;
85 28 : time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
86 28 : session_wrk_timerfd_update (wrk, time_ns);
87 : }
88 :
89 : static transport_endpt_ext_cfg_t *
90 12 : session_mq_get_ext_config (application_t *app, uword offset)
91 : {
92 : svm_fifo_chunk_t *c;
93 : fifo_segment_t *fs;
94 :
95 12 : fs = application_get_rx_mqs_segment (app);
96 12 : c = fs_chunk_ptr (fs->h, offset);
97 12 : return (transport_endpt_ext_cfg_t *) c->data;
98 : }
99 :
100 : static void
101 12 : session_mq_free_ext_config (application_t *app, uword offset)
102 : {
103 : svm_fifo_chunk_t *c;
104 : fifo_segment_t *fs;
105 :
106 12 : fs = application_get_rx_mqs_segment (app);
107 12 : c = fs_chunk_ptr (fs->h, offset);
108 12 : fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
109 12 : }
110 :
111 : static void
112 45 : session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
113 : {
114 45 : vnet_listen_args_t _a, *a = &_a;
115 : session_listen_msg_t *mp;
116 : app_worker_t *app_wrk;
117 : application_t *app;
118 : int rv;
119 :
120 45 : app_check_thread_and_barrier (wrk, elt);
121 :
122 43 : mp = session_evt_ctrl_data (wrk, elt);
123 43 : app = application_lookup (mp->client_index);
124 43 : if (!app)
125 0 : return;
126 :
127 43 : clib_memset (a, 0, sizeof (*a));
128 43 : a->sep.is_ip4 = mp->is_ip4;
129 43 : ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
130 43 : a->sep.port = mp->port;
131 43 : a->sep.fib_index = mp->vrf;
132 43 : a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
133 43 : a->sep.transport_proto = mp->proto;
134 43 : a->app_index = app->app_index;
135 43 : a->wrk_map_index = mp->wrk_index;
136 43 : a->sep_ext.transport_flags = mp->flags;
137 :
138 43 : if (mp->ext_config)
139 5 : a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
140 :
141 43 : if ((rv = vnet_listen (a)))
142 0 : session_worker_stat_error_inc (wrk, rv, 1);
143 :
144 43 : app_wrk = application_get_worker (app, mp->wrk_index);
145 43 : app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
146 :
147 43 : if (mp->ext_config)
148 5 : session_mq_free_ext_config (app, mp->ext_config);
149 :
150 : /* Make sure events are flushed before releasing barrier, to avoid
151 : * potential race with accept. */
152 43 : app_wrk_flush_wrk_events (app_wrk, 0);
153 : }
154 :
155 : static void
156 0 : session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
157 : {
158 0 : vnet_listen_args_t _a, *a = &_a;
159 : session_listen_uri_msg_t *mp;
160 : app_worker_t *app_wrk;
161 : application_t *app;
162 : int rv;
163 :
164 0 : app_check_thread_and_barrier (wrk, elt);
165 :
166 0 : mp = session_evt_ctrl_data (wrk, elt);
167 0 : app = application_lookup (mp->client_index);
168 0 : if (!app)
169 0 : return;
170 :
171 0 : clib_memset (a, 0, sizeof (*a));
172 0 : a->uri = (char *) mp->uri;
173 0 : a->app_index = app->app_index;
174 0 : rv = vnet_bind_uri (a);
175 :
176 0 : app_wrk = application_get_worker (app, 0);
177 0 : app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
178 0 : app_wrk_flush_wrk_events (app_wrk, 0);
179 : }
180 :
181 : static void
182 51 : session_mq_connect_one (session_connect_msg_t *mp)
183 : {
184 51 : vnet_connect_args_t _a, *a = &_a;
185 : app_worker_t *app_wrk;
186 : session_worker_t *wrk;
187 : application_t *app;
188 : int rv;
189 :
190 51 : app = application_lookup (mp->client_index);
191 51 : if (!app)
192 0 : return;
193 :
194 51 : clib_memset (a, 0, sizeof (*a));
195 51 : a->sep.is_ip4 = mp->is_ip4;
196 51 : clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
197 51 : a->sep.port = mp->port;
198 51 : a->sep.transport_proto = mp->proto;
199 51 : a->sep.peer.fib_index = mp->vrf;
200 51 : a->sep.dscp = mp->dscp;
201 51 : clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
202 51 : if (mp->is_ip4)
203 : {
204 47 : ip46_address_mask_ip4 (&a->sep.ip);
205 47 : ip46_address_mask_ip4 (&a->sep.peer.ip);
206 : }
207 51 : a->sep.peer.port = mp->lcl_port;
208 51 : a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
209 51 : a->sep_ext.parent_handle = mp->parent_handle;
210 51 : a->sep_ext.transport_flags = mp->flags;
211 51 : a->api_context = mp->context;
212 51 : a->app_index = app->app_index;
213 51 : a->wrk_map_index = mp->wrk_index;
214 :
215 51 : if (mp->ext_config)
216 7 : a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
217 :
218 51 : if ((rv = vnet_connect (a)))
219 : {
220 0 : wrk = session_main_get_worker (vlib_get_thread_index ());
221 0 : session_worker_stat_error_inc (wrk, rv, 1);
222 0 : app_wrk = application_get_worker (app, mp->wrk_index);
223 0 : app_worker_connect_notify (app_wrk, 0, rv, mp->context);
224 : }
225 :
226 51 : if (mp->ext_config)
227 7 : session_mq_free_ext_config (app, mp->ext_config);
228 : }
229 :
230 : static void
231 51 : session_mq_handle_connects_rpc (void *arg)
232 : {
233 51 : u32 max_connects = 32, n_connects = 0;
234 : session_evt_elt_t *he, *elt, *next;
235 : session_worker_t *fwrk;
236 :
237 51 : ASSERT (session_vlib_thread_is_cl_thread ());
238 :
239 : /* Pending connects on linked list pertaining to first worker */
240 51 : fwrk = session_main_get_worker (transport_cl_thread ());
241 51 : if (!fwrk->n_pending_connects)
242 0 : return;
243 :
244 51 : he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
245 51 : elt = clib_llist_next (fwrk->event_elts, evt_list, he);
246 :
247 : /* Avoid holding the worker for too long */
248 102 : while (n_connects < max_connects && elt != he)
249 : {
250 51 : next = clib_llist_next (fwrk->event_elts, evt_list, elt);
251 51 : clib_llist_remove (fwrk->event_elts, evt_list, elt);
252 51 : session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
253 51 : session_evt_ctrl_data_free (fwrk, elt);
254 51 : clib_llist_put (fwrk->event_elts, elt);
255 51 : elt = next;
256 51 : n_connects += 1;
257 : }
258 :
259 : /* Decrement with worker barrier */
260 51 : fwrk->n_pending_connects -= n_connects;
261 51 : if (fwrk->n_pending_connects > 0)
262 : {
263 0 : session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index,
264 : session_mq_handle_connects_rpc, 0);
265 : }
266 : }
267 :
268 : static void
269 51 : session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
270 : {
271 51 : u32 thread_index = wrk - session_main.wrk;
272 : session_evt_elt_t *he;
273 :
274 51 : if (PREDICT_FALSE (thread_index > transport_cl_thread ()))
275 : {
276 0 : clib_warning ("Connect on wrong thread. Dropping");
277 0 : return;
278 : }
279 :
280 : /* If on worker, check if main has any pending messages. Avoids reordering
281 : * with other control messages that need to be handled by main
282 : */
283 51 : if (thread_index)
284 : {
285 4 : he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
286 :
287 : /* Events pending on main, postpone to avoid reordering */
288 4 : if (!clib_llist_is_empty (wrk->event_elts, evt_list, he))
289 : {
290 0 : clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
291 0 : return;
292 : }
293 : }
294 :
295 : /* Add to pending list to be handled by first worker */
296 51 : he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
297 51 : clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
298 :
299 : /* Decremented with worker barrier */
300 51 : wrk->n_pending_connects += 1;
301 51 : if (wrk->n_pending_connects == 1)
302 : {
303 51 : session_send_rpc_evt_to_thread_force (thread_index,
304 : session_mq_handle_connects_rpc, 0);
305 : }
306 : }
307 :
308 : static void
309 0 : session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
310 : {
311 0 : vnet_connect_args_t _a, *a = &_a;
312 : session_connect_uri_msg_t *mp;
313 : app_worker_t *app_wrk;
314 : application_t *app;
315 : int rv;
316 :
317 0 : app_check_thread_and_barrier (wrk, elt);
318 :
319 0 : mp = session_evt_ctrl_data (wrk, elt);
320 0 : app = application_lookup (mp->client_index);
321 0 : if (!app)
322 0 : return;
323 :
324 0 : clib_memset (a, 0, sizeof (*a));
325 0 : a->uri = (char *) mp->uri;
326 0 : a->api_context = mp->context;
327 0 : a->app_index = app->app_index;
328 0 : if ((rv = vnet_connect_uri (a)))
329 : {
330 0 : session_worker_stat_error_inc (wrk, rv, 1);
331 0 : app_wrk = application_get_worker (app, 0 /* default wrk only */ );
332 0 : app_worker_connect_notify (app_wrk, 0, rv, mp->context);
333 : }
334 : }
335 :
336 : static void
337 0 : session_mq_shutdown_handler (void *data)
338 : {
339 0 : session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
340 0 : vnet_shutdown_args_t _a, *a = &_a;
341 : application_t *app;
342 :
343 0 : app = application_lookup (mp->client_index);
344 0 : if (!app)
345 0 : return;
346 :
347 0 : a->app_index = app->app_index;
348 0 : a->handle = mp->handle;
349 0 : vnet_shutdown_session (a);
350 : }
351 :
352 : static void
353 60 : session_mq_disconnect_handler (void *data)
354 : {
355 60 : session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
356 60 : vnet_disconnect_args_t _a, *a = &_a;
357 : application_t *app;
358 :
359 60 : app = application_lookup (mp->client_index);
360 60 : if (!app)
361 1 : return;
362 :
363 59 : a->app_index = app->app_index;
364 59 : a->handle = mp->handle;
365 59 : vnet_disconnect_session (a);
366 : }
367 :
368 : static void
369 24 : app_mq_detach_handler (session_worker_t *wrk, session_evt_elt_t *elt)
370 : {
371 24 : vnet_app_detach_args_t _a, *a = &_a;
372 : session_app_detach_msg_t *mp;
373 : application_t *app;
374 :
375 26 : app_check_thread_and_barrier (wrk, elt);
376 :
377 24 : mp = session_evt_ctrl_data (wrk, elt);
378 24 : app = application_lookup (mp->client_index);
379 24 : if (!app)
380 2 : return;
381 :
382 22 : a->app_index = app->app_index;
383 22 : a->api_client_index = mp->client_index;
384 22 : vnet_application_detach (a);
385 : }
386 :
387 : static void
388 27 : session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
389 : {
390 27 : vnet_unlisten_args_t _a, *a = &_a;
391 : session_unlisten_msg_t *mp;
392 : app_worker_t *app_wrk;
393 : session_handle_t sh;
394 : application_t *app;
395 : int rv;
396 :
397 29 : app_check_thread_and_barrier (wrk, elt);
398 :
399 25 : mp = session_evt_ctrl_data (wrk, elt);
400 25 : sh = mp->handle;
401 :
402 25 : app = application_lookup (mp->client_index);
403 25 : if (!app)
404 2 : return;
405 :
406 23 : clib_memset (a, 0, sizeof (*a));
407 23 : a->app_index = app->app_index;
408 23 : a->handle = sh;
409 23 : a->wrk_map_index = mp->wrk_index;
410 :
411 23 : if ((rv = vnet_unlisten (a)))
412 0 : session_worker_stat_error_inc (wrk, rv, 1);
413 :
414 23 : app_wrk = application_get_worker (app, a->wrk_map_index);
415 23 : if (!app_wrk)
416 0 : return;
417 :
418 23 : app_worker_unlisten_reply (app_wrk, sh, mp->context, rv);
419 : }
420 :
421 : static void
422 47 : session_mq_accepted_reply_handler (session_worker_t *wrk,
423 : session_evt_elt_t *elt)
424 : {
425 47 : vnet_disconnect_args_t _a = { 0 }, *a = &_a;
426 : session_accepted_reply_msg_t *mp;
427 : session_state_t old_state;
428 : app_worker_t *app_wrk;
429 : session_t *s;
430 :
431 47 : mp = session_evt_ctrl_data (wrk, elt);
432 :
433 : /* Mail this back from the main thread. We're not polling in main
434 : * thread so we're using other workers for notifications. */
435 47 : if (session_thread_from_handle (mp->handle) == 0 && vlib_num_workers () &&
436 0 : vlib_get_thread_index () != 0)
437 : {
438 0 : session_wrk_send_evt_to_main (wrk, elt);
439 12 : return;
440 : }
441 :
442 47 : s = session_get_from_handle_if_valid (mp->handle);
443 47 : if (!s)
444 0 : return;
445 :
446 47 : app_wrk = app_worker_get (s->app_wrk_index);
447 47 : if (app_wrk->app_index != mp->context)
448 : {
449 0 : clib_warning ("app doesn't own session");
450 0 : return;
451 : }
452 :
453 : /* Server isn't interested, disconnect the session */
454 47 : if (mp->retval)
455 : {
456 0 : a->app_index = mp->context;
457 0 : a->handle = mp->handle;
458 0 : vnet_disconnect_session (a);
459 0 : s->app_wrk_index = SESSION_INVALID_INDEX;
460 0 : return;
461 : }
462 :
463 : /* Special handling for cut-through sessions */
464 47 : if (!session_has_transport (s))
465 : {
466 12 : session_set_state (s, SESSION_STATE_READY);
467 12 : ct_session_connect_notify (s, SESSION_E_NONE);
468 12 : return;
469 : }
470 :
471 35 : old_state = s->session_state;
472 35 : session_set_state (s, SESSION_STATE_READY);
473 :
474 35 : if (!svm_fifo_is_empty_prod (s->rx_fifo))
475 3 : app_worker_rx_notify (app_wrk, s);
476 :
477 : /* Closed while waiting for app to reply. Resend disconnect */
478 35 : if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
479 : {
480 0 : app_worker_close_notify (app_wrk, s);
481 0 : session_set_state (s, old_state);
482 0 : return;
483 : }
484 : }
485 :
486 : static void
487 6 : session_mq_reset_reply_handler (void *data)
488 : {
489 6 : vnet_disconnect_args_t _a = { 0 }, *a = &_a;
490 : session_reset_reply_msg_t *mp;
491 : app_worker_t *app_wrk;
492 : session_t *s;
493 : application_t *app;
494 : u32 index, thread_index;
495 :
496 6 : mp = (session_reset_reply_msg_t *) data;
497 6 : app = application_lookup (mp->context);
498 6 : if (!app)
499 0 : return;
500 :
501 6 : session_parse_handle (mp->handle, &index, &thread_index);
502 6 : s = session_get_if_valid (index, thread_index);
503 :
504 : /* No session or not the right session */
505 6 : if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
506 0 : return;
507 :
508 6 : app_wrk = app_worker_get (s->app_wrk_index);
509 6 : if (!app_wrk || app_wrk->app_index != app->app_index)
510 : {
511 0 : clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
512 : mp->handle);
513 0 : return;
514 : }
515 :
516 : /* Client objected to resetting the session, log and continue */
517 6 : if (mp->retval)
518 : {
519 0 : clib_warning ("client retval %d", mp->retval);
520 0 : return;
521 : }
522 :
523 : /* This comes as a response to a reset, transport only waiting for
524 : * confirmation to remove connection state, no need to disconnect */
525 6 : a->handle = mp->handle;
526 6 : a->app_index = app->app_index;
527 6 : vnet_disconnect_session (a);
528 : }
529 :
530 : static void
531 0 : session_mq_disconnected_handler (void *data)
532 : {
533 : session_disconnected_reply_msg_t *rmp;
534 0 : vnet_disconnect_args_t _a, *a = &_a;
535 0 : svm_msg_q_msg_t _msg, *msg = &_msg;
536 : session_disconnected_msg_t *mp;
537 : app_worker_t *app_wrk;
538 : session_event_t *evt;
539 : session_t *s;
540 : application_t *app;
541 0 : int rv = 0;
542 :
543 0 : mp = (session_disconnected_msg_t *) data;
544 0 : if (!(s = session_get_from_handle_if_valid (mp->handle)))
545 : {
546 0 : clib_warning ("could not disconnect handle %llu", mp->handle);
547 0 : return;
548 : }
549 0 : app_wrk = app_worker_get (s->app_wrk_index);
550 0 : app = application_lookup (mp->client_index);
551 0 : if (!(app_wrk && app && app->app_index == app_wrk->app_index))
552 : {
553 0 : clib_warning ("could not disconnect session: %llu app: %u",
554 : mp->handle, mp->client_index);
555 0 : return;
556 : }
557 :
558 0 : a->handle = mp->handle;
559 0 : a->app_index = app_wrk->wrk_index;
560 0 : rv = vnet_disconnect_session (a);
561 :
562 0 : svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
563 : SESSION_MQ_CTRL_EVT_RING,
564 : SVM_Q_WAIT, msg);
565 0 : evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
566 0 : clib_memset (evt, 0, sizeof (*evt));
567 0 : evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
568 0 : rmp = (session_disconnected_reply_msg_t *) evt->data;
569 0 : rmp->handle = mp->handle;
570 0 : rmp->context = mp->context;
571 0 : rmp->retval = rv;
572 0 : svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
573 : }
574 :
575 : static void
576 14 : session_mq_disconnected_reply_handler (void *data)
577 : {
578 : session_disconnected_reply_msg_t *mp;
579 14 : vnet_disconnect_args_t _a, *a = &_a;
580 : application_t *app;
581 :
582 14 : mp = (session_disconnected_reply_msg_t *) data;
583 :
584 : /* Client objected to disconnecting the session, log and continue */
585 14 : if (mp->retval)
586 : {
587 0 : clib_warning ("client retval %d", mp->retval);
588 0 : return;
589 : }
590 :
591 : /* Disconnect has been confirmed. Confirm close to transport */
592 14 : app = application_lookup (mp->context);
593 14 : if (app)
594 : {
595 14 : a->handle = mp->handle;
596 14 : a->app_index = app->app_index;
597 14 : vnet_disconnect_session (a);
598 : }
599 : }
600 :
601 : static void
602 0 : session_mq_worker_update_handler (void *data)
603 : {
604 0 : session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
605 : session_worker_update_reply_msg_t *rmp;
606 0 : svm_msg_q_msg_t _msg, *msg = &_msg;
607 : app_worker_t *app_wrk;
608 : u32 owner_app_wrk_map;
609 : session_event_t *evt;
610 : session_t *s;
611 : application_t *app;
612 : int rv;
613 :
614 0 : app = application_lookup (mp->client_index);
615 0 : if (!app)
616 0 : return;
617 0 : if (!(s = session_get_from_handle_if_valid (mp->handle)))
618 : {
619 0 : clib_warning ("invalid handle %llu", mp->handle);
620 0 : return;
621 : }
622 0 : app_wrk = app_worker_get (s->app_wrk_index);
623 0 : if (app_wrk->app_index != app->app_index)
624 : {
625 0 : clib_warning ("app %u does not own session %llu", app->app_index,
626 : mp->handle);
627 0 : return;
628 : }
629 0 : owner_app_wrk_map = app_wrk->wrk_map_index;
630 0 : app_wrk = application_get_worker (app, mp->wrk_index);
631 :
632 : /* This needs to come from the new owner */
633 0 : if (mp->req_wrk_index == owner_app_wrk_map)
634 : {
635 : session_req_worker_update_msg_t *wump;
636 :
637 0 : svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
638 : SESSION_MQ_CTRL_EVT_RING,
639 : SVM_Q_WAIT, msg);
640 0 : evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
641 0 : clib_memset (evt, 0, sizeof (*evt));
642 0 : evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
643 0 : wump = (session_req_worker_update_msg_t *) evt->data;
644 0 : wump->session_handle = mp->handle;
645 0 : svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
646 0 : return;
647 : }
648 :
649 0 : rv = app_worker_own_session (app_wrk, s);
650 0 : if (rv)
651 0 : session_stat_error_inc (rv, 1);
652 :
653 : /*
654 : * Send reply
655 : */
656 0 : svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
657 : SESSION_MQ_CTRL_EVT_RING,
658 : SVM_Q_WAIT, msg);
659 0 : evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
660 0 : clib_memset (evt, 0, sizeof (*evt));
661 0 : evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
662 0 : rmp = (session_worker_update_reply_msg_t *) evt->data;
663 0 : rmp->handle = mp->handle;
664 0 : if (s->rx_fifo)
665 0 : rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
666 0 : if (s->tx_fifo)
667 0 : rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
668 0 : rmp->segment_handle = session_segment_handle (s);
669 0 : svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
670 :
671 : /*
672 : * Retransmit messages that may have been lost
673 : */
674 0 : if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
675 0 : session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
676 :
677 0 : if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
678 0 : app_worker_rx_notify (app_wrk, s);
679 :
680 0 : if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
681 0 : app_worker_close_notify (app_wrk, s);
682 : }
683 :
684 : static void
685 0 : session_mq_app_wrk_rpc_handler (void *data)
686 : {
687 0 : session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
688 0 : svm_msg_q_msg_t _msg, *msg = &_msg;
689 : session_app_wrk_rpc_msg_t *rmp;
690 : app_worker_t *app_wrk;
691 : session_event_t *evt;
692 : application_t *app;
693 :
694 0 : app = application_lookup (mp->client_index);
695 0 : if (!app)
696 0 : return;
697 :
698 0 : app_wrk = application_get_worker (app, mp->wrk_index);
699 :
700 0 : svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
701 : SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
702 : msg);
703 0 : evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
704 0 : clib_memset (evt, 0, sizeof (*evt));
705 0 : evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
706 0 : rmp = (session_app_wrk_rpc_msg_t *) evt->data;
707 0 : clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
708 0 : svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
709 : }
710 :
711 : static void
712 5 : session_mq_transport_attr_handler (void *data)
713 : {
714 5 : session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
715 : session_transport_attr_reply_msg_t *rmp;
716 5 : svm_msg_q_msg_t _msg, *msg = &_msg;
717 : app_worker_t *app_wrk;
718 : session_event_t *evt;
719 : application_t *app;
720 : session_t *s;
721 : int rv;
722 :
723 5 : app = application_lookup (mp->client_index);
724 5 : if (!app)
725 0 : return;
726 :
727 5 : if (!(s = session_get_from_handle_if_valid (mp->handle)))
728 : {
729 0 : clib_warning ("invalid handle %llu", mp->handle);
730 0 : return;
731 : }
732 5 : app_wrk = app_worker_get (s->app_wrk_index);
733 5 : if (app_wrk->app_index != app->app_index)
734 : {
735 0 : clib_warning ("app %u does not own session %llu", app->app_index,
736 : mp->handle);
737 0 : return;
738 : }
739 :
740 5 : rv = session_transport_attribute (s, mp->is_get, &mp->attr);
741 :
742 5 : svm_msg_q_lock_and_alloc_msg_w_ring (
743 : app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
744 5 : evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
745 5 : clib_memset (evt, 0, sizeof (*evt));
746 5 : evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
747 5 : rmp = (session_transport_attr_reply_msg_t *) evt->data;
748 5 : rmp->handle = mp->handle;
749 5 : rmp->retval = rv;
750 5 : rmp->is_get = mp->is_get;
751 5 : if (!rv && mp->is_get)
752 3 : rmp->attr = mp->attr;
753 5 : svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
754 : }
755 :
756 : void
757 4 : session_wrk_handle_evts_main_rpc (void *args)
758 : {
759 4 : vlib_main_t *vm = vlib_get_main ();
760 : clib_llist_index_t ei, next_ei;
761 : session_evt_elt_t *he, *elt;
762 : session_worker_t *fwrk;
763 : u32 thread_index;
764 :
765 4 : vlib_worker_thread_barrier_sync (vm);
766 :
767 4 : thread_index = pointer_to_uword (args);
768 4 : fwrk = session_main_get_worker (thread_index);
769 :
770 4 : he = clib_llist_elt (fwrk->event_elts, fwrk->evts_pending_main);
771 4 : ei = clib_llist_next_index (he, evt_list);
772 :
773 8 : while (ei != fwrk->evts_pending_main)
774 : {
775 4 : elt = clib_llist_elt (fwrk->event_elts, ei);
776 4 : next_ei = clib_llist_next_index (elt, evt_list);
777 4 : clib_llist_remove (fwrk->event_elts, evt_list, elt);
778 4 : switch (elt->evt.event_type)
779 : {
780 2 : case SESSION_CTRL_EVT_LISTEN:
781 2 : session_mq_listen_handler (fwrk, elt);
782 2 : break;
783 2 : case SESSION_CTRL_EVT_UNLISTEN:
784 2 : session_mq_unlisten_handler (fwrk, elt);
785 2 : break;
786 0 : case SESSION_CTRL_EVT_APP_DETACH:
787 0 : app_mq_detach_handler (fwrk, elt);
788 0 : break;
789 0 : case SESSION_CTRL_EVT_CONNECT_URI:
790 0 : session_mq_connect_uri_handler (fwrk, elt);
791 0 : break;
792 0 : case SESSION_CTRL_EVT_ACCEPTED_REPLY:
793 0 : session_mq_accepted_reply_handler (fwrk, elt);
794 0 : break;
795 0 : case SESSION_CTRL_EVT_CONNECT:
796 0 : session_mq_connect_handler (fwrk, elt);
797 0 : break;
798 0 : default:
799 0 : clib_warning ("unhandled %u", elt->evt.event_type);
800 0 : ALWAYS_ASSERT (0);
801 0 : break;
802 : }
803 :
804 : /* Regrab element in case pool moved */
805 4 : elt = clib_llist_elt (fwrk->event_elts, ei);
806 4 : if (!clib_llist_elt_is_linked (elt, evt_list))
807 : {
808 4 : session_evt_ctrl_data_free (fwrk, elt);
809 4 : clib_llist_put (fwrk->event_elts, elt);
810 : }
811 4 : ei = next_ei;
812 : }
813 :
814 4 : vlib_worker_thread_barrier_release (vm);
815 4 : }
816 :
817 : vlib_node_registration_t session_queue_node;
818 :
819 : typedef struct
820 : {
821 : u32 session_index;
822 : u32 server_thread_index;
823 : } session_queue_trace_t;
824 :
825 : /* packet trace format function */
826 : static u8 *
827 0 : format_session_queue_trace (u8 * s, va_list * args)
828 : {
829 0 : CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
830 0 : CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
831 0 : session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
832 :
833 0 : s = format (s, "session index %d thread index %d",
834 : t->session_index, t->server_thread_index);
835 0 : return s;
836 : }
837 :
838 : #define foreach_session_queue_error \
839 : _ (TX, tx, INFO, "Packets transmitted") \
840 : _ (TIMER, timer, INFO, "Timer events") \
841 : _ (NO_BUFFER, no_buffer, ERROR, "Out of buffers")
842 :
843 : typedef enum
844 : {
845 : #define _(f, n, s, d) SESSION_QUEUE_ERROR_##f,
846 : foreach_session_queue_error
847 : #undef _
848 : SESSION_QUEUE_N_ERROR,
849 : } session_queue_error_t;
850 :
851 : static vlib_error_desc_t session_error_counters[] = {
852 : #define _(f, n, s, d) { #n, d, VL_COUNTER_SEVERITY_##s },
853 : foreach_session_queue_error
854 : #undef _
855 : };
856 :
857 : enum
858 : {
859 : SESSION_TX_NO_BUFFERS = -2,
860 : SESSION_TX_NO_DATA,
861 : SESSION_TX_OK
862 : };
863 :
864 : static void
865 0 : session_tx_trace_frame (vlib_main_t *vm, vlib_node_runtime_t *node,
866 : u32 next_index, vlib_buffer_t **bufs, u16 n_segs,
867 : session_t *s, u32 n_trace)
868 : {
869 0 : vlib_buffer_t **b = bufs;
870 :
871 0 : while (n_trace && n_segs)
872 : {
873 0 : if (PREDICT_TRUE (vlib_trace_buffer (vm, node, next_index, b[0],
874 : 1 /* follow_chain */)))
875 : {
876 : session_queue_trace_t *t =
877 0 : vlib_add_trace (vm, node, b[0], sizeof (*t));
878 0 : t->session_index = s->session_index;
879 0 : t->server_thread_index = s->thread_index;
880 0 : n_trace--;
881 : }
882 0 : b++;
883 0 : n_segs--;
884 : }
885 0 : vlib_set_trace_count (vm, node, n_trace);
886 0 : }
887 :
888 : always_inline int
889 0 : session_tx_fill_dma_transfers (session_worker_t *wrk,
890 : session_tx_context_t *ctx, vlib_buffer_t *b)
891 : {
892 0 : vlib_main_t *vm = wrk->vm;
893 : u32 len_to_deq;
894 0 : u8 *data0 = NULL;
895 : int n_bytes_read, len_write;
896 : svm_fifo_seg_t data_fs[2];
897 :
898 0 : u32 n_segs = 2;
899 0 : u16 n_transfers = 0;
900 : /*
901 : * Start with the first buffer in chain
902 : */
903 0 : b->error = 0;
904 0 : b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
905 0 : b->current_data = 0;
906 0 : data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
907 0 : len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
908 :
909 0 : n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
910 : data_fs, &n_segs, len_to_deq);
911 :
912 0 : len_write = n_bytes_read;
913 0 : ASSERT (n_bytes_read == len_to_deq);
914 :
915 0 : while (n_bytes_read)
916 : {
917 0 : wrk->batch_num++;
918 0 : vlib_dma_batch_add (vm, wrk->batch, data0, data_fs[n_transfers].data,
919 : data_fs[n_transfers].len);
920 0 : data0 += data_fs[n_transfers].len;
921 0 : n_bytes_read -= data_fs[n_transfers].len;
922 0 : n_transfers++;
923 : }
924 0 : return len_write;
925 : }
926 :
927 : always_inline int
928 0 : session_tx_fill_dma_transfers_tail (session_worker_t *wrk,
929 : session_tx_context_t *ctx,
930 : vlib_buffer_t *b, u32 len_to_deq, u8 *data)
931 : {
932 0 : vlib_main_t *vm = wrk->vm;
933 : int n_bytes_read, len_write;
934 : svm_fifo_seg_t data_fs[2];
935 0 : u32 n_segs = 2;
936 0 : u16 n_transfers = 0;
937 :
938 0 : n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
939 : data_fs, &n_segs, len_to_deq);
940 :
941 0 : len_write = n_bytes_read;
942 :
943 0 : ASSERT (n_bytes_read == len_to_deq);
944 :
945 0 : while (n_bytes_read)
946 : {
947 0 : wrk->batch_num++;
948 0 : vlib_dma_batch_add (vm, wrk->batch, data, data_fs[n_transfers].data,
949 : data_fs[n_transfers].len);
950 0 : data += data_fs[n_transfers].len;
951 0 : n_bytes_read -= data_fs[n_transfers].len;
952 0 : n_transfers++;
953 : }
954 :
955 0 : return len_write;
956 : }
957 :
958 : always_inline int
959 1029090 : session_tx_copy_data (session_worker_t *wrk, session_tx_context_t *ctx,
960 : vlib_buffer_t *b, u32 len_to_deq, u8 *data0)
961 : {
962 : int n_bytes_read;
963 1029090 : if (PREDICT_TRUE (!wrk->dma_enabled))
964 : n_bytes_read =
965 1029090 : svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0);
966 : else
967 0 : n_bytes_read = session_tx_fill_dma_transfers (wrk, ctx, b);
968 1029090 : return n_bytes_read;
969 : }
970 :
971 : always_inline int
972 0 : session_tx_copy_data_tail (session_worker_t *wrk, session_tx_context_t *ctx,
973 : vlib_buffer_t *b, u32 len_to_deq, u8 *data)
974 : {
975 : int n_bytes_read;
976 0 : if (PREDICT_TRUE (!wrk->dma_enabled))
977 : n_bytes_read =
978 0 : svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data);
979 : else
980 : n_bytes_read =
981 0 : session_tx_fill_dma_transfers_tail (wrk, ctx, b, len_to_deq, data);
982 0 : return n_bytes_read;
983 : }
984 :
985 : always_inline void
986 0 : session_tx_fifo_chain_tail (session_worker_t *wrk, session_tx_context_t *ctx,
987 : vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
988 : {
989 0 : vlib_main_t *vm = wrk->vm;
990 : vlib_buffer_t *chain_b, *prev_b;
991 : u32 chain_bi0, to_deq, left_from_seg;
992 : int len_to_deq, n_bytes_read;
993 : u8 *data, j;
994 :
995 0 : b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
996 0 : b->total_length_not_including_first_buffer = 0;
997 :
998 0 : chain_b = b;
999 0 : left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
1000 : ctx->left_to_snd);
1001 0 : to_deq = left_from_seg;
1002 0 : for (j = 1; j < ctx->n_bufs_per_seg; j++)
1003 : {
1004 0 : prev_b = chain_b;
1005 0 : len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
1006 :
1007 0 : *n_bufs -= 1;
1008 0 : chain_bi0 = ctx->tx_buffers[*n_bufs];
1009 0 : chain_b = vlib_get_buffer (vm, chain_bi0);
1010 0 : chain_b->current_data = 0;
1011 0 : data = vlib_buffer_get_current (chain_b);
1012 0 : if (peek_data)
1013 : {
1014 : n_bytes_read =
1015 0 : session_tx_copy_data_tail (wrk, ctx, b, len_to_deq, data);
1016 0 : ctx->sp.tx_offset += n_bytes_read;
1017 : }
1018 : else
1019 : {
1020 0 : if (ctx->transport_vft->transport_options.tx_type ==
1021 : TRANSPORT_TX_DGRAM)
1022 : {
1023 0 : svm_fifo_t *f = ctx->s->tx_fifo;
1024 0 : session_dgram_hdr_t *hdr = &ctx->hdr;
1025 : u16 deq_now;
1026 : u32 offset;
1027 :
1028 0 : deq_now = clib_min (hdr->data_length - hdr->data_offset,
1029 : len_to_deq);
1030 0 : offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
1031 0 : n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
1032 0 : ASSERT (n_bytes_read > 0);
1033 :
1034 0 : hdr->data_offset += n_bytes_read;
1035 0 : if (hdr->data_offset == hdr->data_length)
1036 : {
1037 0 : offset = hdr->data_length + SESSION_CONN_HDR_LEN;
1038 0 : svm_fifo_dequeue_drop (f, offset);
1039 0 : if (ctx->left_to_snd > n_bytes_read)
1040 0 : svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
1041 0 : (u8 *) & ctx->hdr);
1042 : }
1043 0 : else if (ctx->left_to_snd == n_bytes_read)
1044 0 : svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
1045 : sizeof (session_dgram_pre_hdr_t));
1046 : }
1047 : else
1048 0 : n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
1049 : len_to_deq, data);
1050 : }
1051 0 : ASSERT (n_bytes_read == len_to_deq);
1052 0 : chain_b->current_length = n_bytes_read;
1053 0 : b->total_length_not_including_first_buffer += chain_b->current_length;
1054 :
1055 : /* update previous buffer */
1056 0 : prev_b->next_buffer = chain_bi0;
1057 0 : prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
1058 :
1059 : /* update current buffer */
1060 0 : chain_b->next_buffer = 0;
1061 :
1062 0 : to_deq -= n_bytes_read;
1063 0 : if (to_deq == 0)
1064 0 : break;
1065 : }
1066 0 : ASSERT (to_deq == 0
1067 : && b->total_length_not_including_first_buffer == left_from_seg);
1068 0 : ctx->left_to_snd -= left_from_seg;
1069 0 : }
1070 :
1071 : always_inline void
1072 1088160 : session_tx_fill_buffer (session_worker_t *wrk, session_tx_context_t *ctx,
1073 : vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
1074 : {
1075 : u32 len_to_deq;
1076 : u8 *data0;
1077 : int n_bytes_read;
1078 : /*
1079 : * Start with the first buffer in chain
1080 : */
1081 1088160 : b->error = 0;
1082 1088160 : b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
1083 1088160 : b->current_data = 0;
1084 :
1085 1088160 : data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
1086 1088160 : len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
1087 :
1088 1088160 : if (peek_data)
1089 : {
1090 1029090 : n_bytes_read = session_tx_copy_data (wrk, ctx, b, len_to_deq, data0);
1091 1029090 : ASSERT (n_bytes_read > 0);
1092 : /* Keep track of progress locally, transport is also supposed to
1093 : * increment it independently when pushing the header */
1094 1029090 : ctx->sp.tx_offset += n_bytes_read;
1095 : }
1096 : else
1097 : {
1098 59065 : if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1099 : {
1100 59065 : session_dgram_hdr_t *hdr = &ctx->hdr;
1101 59065 : svm_fifo_t *f = ctx->s->tx_fifo;
1102 : u16 deq_now;
1103 : u32 offset;
1104 :
1105 59065 : ASSERT (hdr->data_length > hdr->data_offset);
1106 59065 : deq_now = clib_min (hdr->data_length - hdr->data_offset,
1107 : len_to_deq);
1108 59065 : offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
1109 59065 : n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
1110 59065 : ASSERT (n_bytes_read > 0);
1111 :
1112 59065 : if (transport_connection_is_cless (ctx->tc))
1113 : {
1114 0 : clib_memcpy_fast (data0 - sizeof (session_dgram_hdr_t), hdr,
1115 : sizeof (*hdr));
1116 : }
1117 59065 : hdr->data_offset += n_bytes_read;
1118 59065 : if (hdr->data_offset == hdr->data_length)
1119 : {
1120 59065 : offset = hdr->data_length + SESSION_CONN_HDR_LEN;
1121 59065 : svm_fifo_dequeue_drop (f, offset);
1122 59065 : if (ctx->left_to_snd > n_bytes_read)
1123 42005 : svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
1124 42005 : (u8 *) & ctx->hdr);
1125 : }
1126 0 : else if (ctx->left_to_snd == n_bytes_read)
1127 0 : svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
1128 : sizeof (session_dgram_pre_hdr_t));
1129 : }
1130 : else
1131 : {
1132 0 : n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
1133 : len_to_deq, data0);
1134 0 : ASSERT (n_bytes_read > 0);
1135 : }
1136 : }
1137 :
1138 1088160 : b->current_length = n_bytes_read;
1139 1088160 : ctx->left_to_snd -= n_bytes_read;
1140 :
1141 : /*
1142 : * Fill in the remaining buffers in the chain, if any
1143 : */
1144 1088160 : if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
1145 0 : session_tx_fifo_chain_tail (wrk, ctx, b, n_bufs, peek_data);
1146 1088160 : }
1147 :
1148 : always_inline u8
1149 101925 : session_tx_not_ready (session_t * s, u8 peek_data)
1150 : {
1151 101925 : if (peek_data)
1152 : {
1153 83734 : if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
1154 83459 : return 0;
1155 : /* Can retransmit for closed sessions but can't send new data if
1156 : * session is not ready or closed */
1157 275 : else if (s->session_state < SESSION_STATE_READY)
1158 : {
1159 : /* Allow accepting session to send custom packets.
1160 : * For instance, tcp want to send acks in established, but
1161 : * the app has not called accept() yet */
1162 0 : if (s->session_state == SESSION_STATE_ACCEPTING &&
1163 0 : (s->flags & SESSION_F_CUSTOM_TX))
1164 0 : return 0;
1165 0 : return 1;
1166 : }
1167 275 : else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1168 : {
1169 : /* Allow closed transports to still send custom packets.
1170 : * For instance, tcp may want to send acks in time-wait. */
1171 131 : if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
1172 131 : && (s->flags & SESSION_F_CUSTOM_TX))
1173 129 : return 0;
1174 2 : return 2;
1175 : }
1176 : }
1177 : else
1178 : {
1179 18191 : if (s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1180 0 : return 2;
1181 : }
1182 18335 : return 0;
1183 : }
1184 :
1185 : always_inline transport_connection_t *
1186 101923 : session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
1187 : {
1188 101923 : if (peek_data)
1189 : {
1190 83732 : return ctx->transport_vft->get_connection (ctx->s->connection_index,
1191 83732 : ctx->s->thread_index);
1192 : }
1193 : else
1194 : {
1195 18191 : if (ctx->s->session_state == SESSION_STATE_LISTENING)
1196 0 : return ctx->transport_vft->get_listener (ctx->s->connection_index);
1197 : else
1198 : {
1199 18191 : return ctx->transport_vft->get_connection (ctx->s->connection_index,
1200 18191 : ctx->s->thread_index);
1201 : }
1202 : }
1203 : }
1204 :
1205 : always_inline void
1206 87251 : session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
1207 : u32 max_segs, u8 peek_data)
1208 : {
1209 : u32 n_bytes_per_buf, n_bytes_per_seg;
1210 :
1211 87251 : n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
1212 87251 : ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
1213 :
1214 87251 : if (peek_data)
1215 : {
1216 : /* Offset in rx fifo from where to peek data */
1217 69060 : if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
1218 : {
1219 27040 : ctx->max_len_to_snd = 0;
1220 27040 : return;
1221 : }
1222 42020 : ctx->max_dequeue -= ctx->sp.tx_offset;
1223 : }
1224 : else
1225 : {
1226 18191 : if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1227 : {
1228 : u32 len, chain_limit;
1229 :
1230 18191 : if (ctx->max_dequeue <= sizeof (ctx->hdr))
1231 : {
1232 1131 : ctx->max_len_to_snd = 0;
1233 1131 : return;
1234 : }
1235 :
1236 17060 : svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
1237 17060 : (u8 *) & ctx->hdr);
1238 : /* Zero length dgrams not supported */
1239 17060 : if (PREDICT_FALSE (ctx->hdr.data_length == 0))
1240 : {
1241 0 : svm_fifo_dequeue_drop (ctx->s->tx_fifo, sizeof (ctx->hdr));
1242 0 : ctx->max_len_to_snd = 0;
1243 0 : return;
1244 : }
1245 : /* We cannot be sure apps have not enqueued incomplete dgrams */
1246 17060 : if (PREDICT_FALSE (ctx->max_dequeue <
1247 : ctx->hdr.data_length + sizeof (ctx->hdr)))
1248 : {
1249 0 : ctx->max_len_to_snd = 0;
1250 0 : return;
1251 : }
1252 17060 : ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
1253 17060 : len = ctx->hdr.data_length - ctx->hdr.data_offset;
1254 :
1255 17060 : if (ctx->hdr.gso_size)
1256 : {
1257 0 : ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, ctx->hdr.gso_size);
1258 : }
1259 :
1260 : /* Process multiple dgrams if smaller than min (buf_space, mss).
1261 : * This avoids handling multiple dgrams if they require buffer
1262 : * chains */
1263 17060 : chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
1264 : ctx->sp.snd_mss);
1265 17060 : if (ctx->hdr.data_length <= chain_limit)
1266 : {
1267 : u32 first_dgram_len, dgram_len, offset, max_offset;
1268 : session_dgram_hdr_t hdr;
1269 :
1270 17060 : ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
1271 17060 : offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
1272 17060 : first_dgram_len = len;
1273 17060 : max_offset = clib_min (ctx->max_dequeue, 16 << 10);
1274 :
1275 59355 : while (offset < max_offset)
1276 : {
1277 45420 : svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
1278 : (u8 *) & hdr);
1279 45420 : dgram_len = hdr.data_length - hdr.data_offset;
1280 45420 : if (offset + sizeof (hdr) + hdr.data_length >
1281 45420 : ctx->max_dequeue ||
1282 : first_dgram_len != dgram_len)
1283 : break;
1284 : /* Assert here to allow test above with zero length dgrams */
1285 42295 : ASSERT (hdr.data_length > hdr.data_offset);
1286 42295 : len += dgram_len;
1287 42295 : offset += sizeof (hdr) + hdr.data_length;
1288 : }
1289 : }
1290 :
1291 17060 : ctx->max_dequeue = len;
1292 : }
1293 : }
1294 59080 : ASSERT (ctx->max_dequeue > 0);
1295 :
1296 : /* Ensure we're not writing more than transport window allows */
1297 59080 : if (ctx->max_dequeue < ctx->sp.snd_space)
1298 : {
1299 : /* Constrained by tx queue. Try to send only fully formed segments */
1300 18342 : ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
1301 18342 : (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
1302 : ctx->max_dequeue;
1303 : /* TODO Nagle ? */
1304 : }
1305 : else
1306 : {
1307 : /* Expectation is that snd_space0 is already a multiple of snd_mss */
1308 40738 : ctx->max_len_to_snd = ctx->sp.snd_space;
1309 : }
1310 :
1311 : /* Check if we're tx constrained by the node */
1312 59080 : ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
1313 59080 : if (ctx->n_segs_per_evt > max_segs)
1314 : {
1315 589 : ctx->n_segs_per_evt = max_segs;
1316 589 : ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
1317 : }
1318 :
1319 59080 : ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
1320 59080 : if (ctx->n_segs_per_evt > 1)
1321 : {
1322 : u32 n_bytes_last_seg, n_bufs_last_seg;
1323 :
1324 30330 : n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
1325 30330 : n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
1326 30330 : - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
1327 30330 : ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1328 30330 : n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
1329 30330 : ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
1330 30330 : + n_bufs_last_seg;
1331 : }
1332 : else
1333 : {
1334 28750 : n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
1335 28750 : ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1336 28750 : ctx->n_bufs_needed = ctx->n_bufs_per_seg;
1337 : }
1338 :
1339 59080 : ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1340 59080 : ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1341 : n_bytes_per_buf -
1342 : TRANSPORT_MAX_HDRS_LEN);
1343 : }
1344 :
1345 : always_inline void
1346 54227 : session_tx_maybe_reschedule (session_worker_t * wrk,
1347 : session_tx_context_t * ctx,
1348 : session_evt_elt_t * elt)
1349 : {
1350 54227 : session_t *s = ctx->s;
1351 :
1352 54227 : svm_fifo_unset_event (s->tx_fifo);
1353 54227 : if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1354 : {
1355 5324 : if (svm_fifo_set_event (s->tx_fifo))
1356 5324 : session_evt_add_head_old (wrk, elt);
1357 : }
1358 : else
1359 : {
1360 48903 : transport_connection_deschedule (ctx->tc);
1361 : }
1362 54227 : }
1363 :
1364 : always_inline void
1365 1088160 : session_tx_add_pending_buffer (session_worker_t *wrk, u32 bi, u32 next_index)
1366 : {
1367 1088160 : if (PREDICT_TRUE (!wrk->dma_enabled))
1368 : {
1369 1088160 : vec_add1 (wrk->pending_tx_buffers, bi);
1370 1088160 : vec_add1 (wrk->pending_tx_nexts, next_index);
1371 : }
1372 : else
1373 : {
1374 0 : session_dma_transfer *dma_transfer = &wrk->dma_trans[wrk->trans_tail];
1375 0 : vec_add1 (dma_transfer->pending_tx_buffers, bi);
1376 0 : vec_add1 (dma_transfer->pending_tx_nexts, next_index);
1377 : }
1378 1088160 : }
1379 :
1380 : always_inline int
1381 101925 : session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1382 : vlib_node_runtime_t * node,
1383 : session_evt_elt_t * elt,
1384 : int *n_tx_packets, u8 peek_data)
1385 : {
1386 : u32 n_trace, n_left, pbi, next_index, max_burst;
1387 101925 : session_tx_context_t *ctx = &wrk->ctx;
1388 101925 : session_main_t *smm = &session_main;
1389 101925 : session_event_t *e = &elt->evt;
1390 101925 : vlib_main_t *vm = wrk->vm;
1391 : transport_proto_t tp;
1392 : vlib_buffer_t *pb;
1393 : u16 n_bufs, rv;
1394 :
1395 101925 : if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1396 : {
1397 2 : if (rv < 2)
1398 0 : session_evt_add_old (wrk, elt);
1399 2 : return SESSION_TX_NO_DATA;
1400 : }
1401 :
1402 101923 : next_index = smm->session_type_to_next[ctx->s->session_type];
1403 101923 : max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1404 :
1405 101923 : tp = session_get_transport_proto (ctx->s);
1406 101923 : ctx->transport_vft = transport_protocol_get_vft (tp);
1407 101923 : ctx->tc = session_tx_get_transport (ctx, peek_data);
1408 :
1409 101923 : if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1410 : {
1411 697 : if (ctx->transport_vft->flush_data)
1412 317 : ctx->transport_vft->flush_data (ctx->tc);
1413 697 : e->event_type = SESSION_IO_EVT_TX;
1414 : }
1415 :
1416 101923 : if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1417 : {
1418 : u32 n_custom_tx;
1419 41829 : ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1420 41829 : ctx->sp.max_burst_size = max_burst;
1421 41829 : n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1422 41829 : *n_tx_packets += n_custom_tx;
1423 41829 : if (PREDICT_FALSE
1424 : (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1425 129 : return SESSION_TX_OK;
1426 41700 : max_burst -= n_custom_tx;
1427 41700 : if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1428 : {
1429 0 : session_evt_add_old (wrk, elt);
1430 0 : return SESSION_TX_OK;
1431 : }
1432 : }
1433 :
1434 : /* Connection previously descheduled because it had no data to send.
1435 : * Clear descheduled flag and reset pacer if in use */
1436 101794 : if (transport_connection_is_descheduled (ctx->tc))
1437 22214 : transport_connection_clear_descheduled (ctx->tc);
1438 :
1439 101794 : transport_connection_snd_params (ctx->tc, &ctx->sp);
1440 :
1441 101794 : if (!ctx->sp.snd_space)
1442 : {
1443 : /* If the deschedule flag was set, remove session from scheduler.
1444 : * Transport is responsible for rescheduling this session. */
1445 14457 : if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1446 14457 : transport_connection_deschedule (ctx->tc);
1447 : /* Request to postpone the session, e.g., zero-wnd and transport
1448 : * is not currently probing */
1449 0 : else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1450 0 : session_evt_add_old (wrk, elt);
1451 : /* This flow queue is "empty" so it should be re-evaluated before
1452 : * the ones that have data to send. */
1453 : else
1454 0 : session_evt_add_head_old (wrk, elt);
1455 :
1456 14457 : return SESSION_TX_NO_DATA;
1457 : }
1458 :
1459 87337 : if (transport_connection_is_tx_paced (ctx->tc))
1460 : {
1461 69146 : u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1462 69146 : if (snd_space < TRANSPORT_PACER_MIN_BURST)
1463 : {
1464 86 : session_evt_add_head_old (wrk, elt);
1465 86 : return SESSION_TX_NO_DATA;
1466 : }
1467 69060 : snd_space = clib_min (ctx->sp.snd_space, snd_space);
1468 69060 : ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1469 69060 : snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1470 : }
1471 :
1472 : /* Check how much we can pull. */
1473 87251 : session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1474 :
1475 87251 : if (PREDICT_FALSE (!ctx->max_len_to_snd))
1476 : {
1477 28171 : transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1478 28171 : session_tx_maybe_reschedule (wrk, ctx, elt);
1479 28171 : return SESSION_TX_NO_DATA;
1480 : }
1481 :
1482 59080 : vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
1483 : CLIB_CACHE_LINE_BYTES);
1484 59080 : n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
1485 59080 : if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1486 : {
1487 0 : if (n_bufs)
1488 0 : vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1489 0 : session_evt_add_head_old (wrk, elt);
1490 0 : vlib_node_increment_counter (wrk->vm, node->node_index,
1491 : SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1492 0 : return SESSION_TX_NO_BUFFERS;
1493 : }
1494 :
1495 59080 : if (transport_connection_is_tx_paced (ctx->tc))
1496 42020 : transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1497 :
1498 59080 : ctx->left_to_snd = ctx->max_len_to_snd;
1499 59080 : n_left = ctx->n_segs_per_evt;
1500 :
1501 59080 : vec_validate (ctx->transport_pending_bufs, n_left);
1502 :
1503 544812 : while (n_left >= 4)
1504 : {
1505 : vlib_buffer_t *b0, *b1;
1506 : u32 bi0, bi1;
1507 :
1508 485732 : pbi = ctx->tx_buffers[n_bufs - 3];
1509 485732 : pb = vlib_get_buffer (vm, pbi);
1510 485732 : vlib_prefetch_buffer_header (pb, STORE);
1511 485732 : pbi = ctx->tx_buffers[n_bufs - 4];
1512 485732 : pb = vlib_get_buffer (vm, pbi);
1513 485732 : vlib_prefetch_buffer_header (pb, STORE);
1514 :
1515 485732 : bi0 = ctx->tx_buffers[--n_bufs];
1516 485732 : bi1 = ctx->tx_buffers[--n_bufs];
1517 :
1518 485732 : b0 = vlib_get_buffer (vm, bi0);
1519 485732 : b1 = vlib_get_buffer (vm, bi1);
1520 :
1521 485732 : session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
1522 485732 : session_tx_fill_buffer (wrk, ctx, b1, &n_bufs, peek_data);
1523 :
1524 485732 : ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
1525 485732 : ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left + 1] = b1;
1526 485732 : n_left -= 2;
1527 :
1528 485732 : session_tx_add_pending_buffer (wrk, bi0, next_index);
1529 485732 : session_tx_add_pending_buffer (wrk, bi1, next_index);
1530 : }
1531 175773 : while (n_left)
1532 : {
1533 : vlib_buffer_t *b0;
1534 : u32 bi0;
1535 :
1536 116693 : if (n_left > 1)
1537 : {
1538 57613 : pbi = ctx->tx_buffers[n_bufs - 2];
1539 57613 : pb = vlib_get_buffer (vm, pbi);
1540 57613 : vlib_prefetch_buffer_header (pb, STORE);
1541 : }
1542 :
1543 116693 : bi0 = ctx->tx_buffers[--n_bufs];
1544 116693 : b0 = vlib_get_buffer (vm, bi0);
1545 116693 : session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
1546 :
1547 116693 : ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
1548 116693 : n_left -= 1;
1549 :
1550 116693 : session_tx_add_pending_buffer (wrk, bi0, next_index);
1551 : }
1552 :
1553 : /* Ask transport to push headers */
1554 59080 : ctx->transport_vft->push_header (ctx->tc, ctx->transport_pending_bufs,
1555 59080 : ctx->n_segs_per_evt);
1556 :
1557 59080 : if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1558 0 : session_tx_trace_frame (vm, node, next_index, ctx->transport_pending_bufs,
1559 0 : ctx->n_segs_per_evt, ctx->s, n_trace);
1560 :
1561 59080 : if (PREDICT_FALSE (n_bufs))
1562 0 : vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1563 :
1564 59080 : *n_tx_packets += ctx->n_segs_per_evt;
1565 :
1566 : SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1567 : ctx->s->tx_fifo->shr->has_event, wrk->last_vlib_time);
1568 :
1569 59080 : ASSERT (ctx->left_to_snd == 0);
1570 :
1571 : /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1572 : * check if application enqueued more data and reschedule accordingly */
1573 59080 : if (ctx->max_len_to_snd < ctx->max_dequeue)
1574 33024 : session_evt_add_old (wrk, elt);
1575 : else
1576 26056 : session_tx_maybe_reschedule (wrk, ctx, elt);
1577 :
1578 59080 : if (!peek_data)
1579 : {
1580 17060 : u32 n_dequeued = ctx->max_len_to_snd;
1581 17060 : if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1582 17060 : n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1583 17060 : if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1584 51 : session_dequeue_notify (ctx->s);
1585 : }
1586 59080 : return SESSION_TX_OK;
1587 : }
1588 :
1589 : int
1590 83734 : session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1591 : vlib_node_runtime_t * node,
1592 : session_evt_elt_t * e, int *n_tx_packets)
1593 : {
1594 83734 : return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1595 : }
1596 :
1597 : int
1598 18191 : session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1599 : vlib_node_runtime_t * node,
1600 : session_evt_elt_t * e, int *n_tx_packets)
1601 : {
1602 18191 : return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1603 : }
1604 :
1605 : int
1606 1472670 : session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1607 : vlib_node_runtime_t * node,
1608 : session_evt_elt_t * elt, int *n_tx_packets)
1609 : {
1610 1472670 : transport_send_params_t *sp = &wrk->ctx.sp;
1611 1472670 : session_t *s = wrk->ctx.s;
1612 : clib_llist_index_t ei;
1613 : u32 n_packets;
1614 :
1615 1472670 : if (PREDICT_FALSE ((s->session_state >= SESSION_STATE_TRANSPORT_CLOSED) ||
1616 : (s->session_state == SESSION_STATE_CONNECTING &&
1617 : (s->flags & SESSION_F_HALF_OPEN))))
1618 14 : return 0;
1619 :
1620 : /* Clear custom-tx flag used to request reschedule for tx */
1621 1472660 : s->flags &= ~SESSION_F_CUSTOM_TX;
1622 :
1623 1472660 : sp->flags = 0;
1624 1472660 : sp->bytes_dequeued = 0;
1625 1472660 : sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1626 : TRANSPORT_PACER_MAX_BURST_PKTS);
1627 :
1628 : /* Grab elt index since app transports can enqueue events on tx */
1629 1472660 : ei = clib_llist_entry_index (wrk->event_elts, elt);
1630 :
1631 1472660 : n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1632 1472660 : *n_tx_packets += n_packets;
1633 :
1634 1472660 : elt = clib_llist_elt (wrk->event_elts, ei);
1635 :
1636 1472660 : if (s->flags & SESSION_F_CUSTOM_TX)
1637 : {
1638 1273180 : session_evt_add_old (wrk, elt);
1639 : }
1640 199478 : else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1641 : {
1642 62105 : svm_fifo_unset_event (s->tx_fifo);
1643 62105 : if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1644 62080 : if (svm_fifo_set_event (s->tx_fifo))
1645 62080 : session_evt_add_head_old (wrk, elt);
1646 : }
1647 :
1648 1473300 : if (sp->bytes_dequeued &&
1649 641 : svm_fifo_needs_deq_ntf (s->tx_fifo, sp->bytes_dequeued))
1650 1 : session_dequeue_notify (s);
1651 :
1652 1472660 : return n_packets;
1653 : }
1654 :
1655 : always_inline session_t *
1656 1582710 : session_event_get_session (session_worker_t * wrk, session_event_t * e)
1657 : {
1658 1582710 : if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1659 5 : return 0;
1660 :
1661 1582710 : ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1662 1582710 : return pool_elt_at_index (wrk->sessions, e->session_index);
1663 : }
1664 :
1665 : always_inline void
1666 928 : session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1667 : {
1668 : clib_llist_index_t ei;
1669 : void (*fp) (void *);
1670 : session_event_t *e;
1671 : session_t *s;
1672 :
1673 928 : ei = clib_llist_entry_index (wrk->event_elts, elt);
1674 928 : e = &elt->evt;
1675 :
1676 928 : switch (e->event_type)
1677 : {
1678 146 : case SESSION_CTRL_EVT_RPC:
1679 146 : fp = e->rpc_args.fp;
1680 146 : (*fp) (e->rpc_args.arg);
1681 146 : break;
1682 0 : case SESSION_CTRL_EVT_HALF_CLOSE:
1683 0 : s = session_get_from_handle_if_valid (e->session_handle);
1684 0 : if (PREDICT_FALSE (!s))
1685 0 : break;
1686 0 : session_transport_half_close (s);
1687 0 : break;
1688 507 : case SESSION_CTRL_EVT_CLOSE:
1689 507 : s = session_get_from_handle_if_valid (e->session_handle);
1690 507 : if (PREDICT_FALSE (!s))
1691 0 : break;
1692 507 : session_transport_close (s);
1693 507 : break;
1694 0 : case SESSION_CTRL_EVT_RESET:
1695 0 : s = session_get_from_handle_if_valid (e->session_handle);
1696 0 : if (PREDICT_FALSE (!s))
1697 0 : break;
1698 0 : session_transport_reset (s);
1699 0 : break;
1700 43 : case SESSION_CTRL_EVT_LISTEN:
1701 43 : session_mq_listen_handler (wrk, elt);
1702 43 : break;
1703 0 : case SESSION_CTRL_EVT_LISTEN_URI:
1704 0 : session_mq_listen_uri_handler (wrk, elt);
1705 0 : break;
1706 25 : case SESSION_CTRL_EVT_UNLISTEN:
1707 25 : session_mq_unlisten_handler (wrk, elt);
1708 25 : break;
1709 51 : case SESSION_CTRL_EVT_CONNECT:
1710 51 : session_mq_connect_handler (wrk, elt);
1711 51 : break;
1712 0 : case SESSION_CTRL_EVT_CONNECT_URI:
1713 0 : session_mq_connect_uri_handler (wrk, elt);
1714 0 : break;
1715 0 : case SESSION_CTRL_EVT_SHUTDOWN:
1716 0 : session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1717 0 : break;
1718 60 : case SESSION_CTRL_EVT_DISCONNECT:
1719 60 : session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1720 60 : break;
1721 0 : case SESSION_CTRL_EVT_DISCONNECTED:
1722 0 : session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1723 0 : break;
1724 47 : case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1725 47 : session_mq_accepted_reply_handler (wrk, elt);
1726 47 : break;
1727 14 : case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1728 14 : session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1729 : elt));
1730 14 : break;
1731 6 : case SESSION_CTRL_EVT_RESET_REPLY:
1732 6 : session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1733 6 : break;
1734 0 : case SESSION_CTRL_EVT_WORKER_UPDATE:
1735 0 : session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1736 0 : break;
1737 24 : case SESSION_CTRL_EVT_APP_DETACH:
1738 24 : app_mq_detach_handler (wrk, elt);
1739 24 : break;
1740 0 : case SESSION_CTRL_EVT_APP_WRK_RPC:
1741 0 : session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1742 0 : break;
1743 5 : case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1744 5 : session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1745 5 : break;
1746 0 : default:
1747 0 : clib_warning ("unhandled event type %d", e->event_type);
1748 : }
1749 :
1750 : /* Regrab elements in case pool moved */
1751 928 : elt = clib_llist_elt (wrk->event_elts, ei);
1752 928 : if (!clib_llist_elt_is_linked (elt, evt_list))
1753 : {
1754 873 : e = &elt->evt;
1755 873 : if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1756 220 : session_evt_ctrl_data_free (wrk, elt);
1757 873 : clib_llist_put (wrk->event_elts, elt);
1758 : }
1759 : SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1760 928 : }
1761 :
1762 : always_inline void
1763 1582710 : session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1764 : session_evt_elt_t * elt, int *n_tx_packets)
1765 : {
1766 1582710 : session_main_t *smm = &session_main;
1767 : app_worker_t *app_wrk;
1768 : clib_llist_index_t ei;
1769 : session_event_t *e;
1770 : session_t *s;
1771 :
1772 1582710 : ei = clib_llist_entry_index (wrk->event_elts, elt);
1773 1582710 : e = &elt->evt;
1774 :
1775 1582710 : switch (e->event_type)
1776 : {
1777 1574600 : case SESSION_IO_EVT_TX_FLUSH:
1778 : case SESSION_IO_EVT_TX:
1779 1574600 : s = session_event_get_session (wrk, e);
1780 1574600 : if (PREDICT_FALSE (!s))
1781 5 : break;
1782 1574600 : CLIB_PREFETCH (s->tx_fifo, sizeof (*(s->tx_fifo)), LOAD);
1783 1574600 : wrk->ctx.s = s;
1784 : /* Spray packets in per session type frames, since they go to
1785 : * different nodes */
1786 1574600 : (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1787 1574600 : break;
1788 5604 : case SESSION_IO_EVT_RX:
1789 5604 : s = session_event_get_session (wrk, e);
1790 5604 : if (!s || s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1791 : break;
1792 5604 : transport_app_rx_evt (session_get_transport_proto (s),
1793 5604 : s->connection_index, s->thread_index);
1794 5604 : break;
1795 2506 : case SESSION_IO_EVT_BUILTIN_RX:
1796 2506 : s = session_event_get_session (wrk, e);
1797 2506 : if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1798 : break;
1799 2506 : svm_fifo_unset_event (s->rx_fifo);
1800 2506 : app_wrk = app_worker_get (s->app_wrk_index);
1801 2506 : app_worker_rx_notify (app_wrk, s);
1802 2506 : break;
1803 0 : case SESSION_IO_EVT_TX_MAIN:
1804 0 : s = session_get_if_valid (e->session_index, 0 /* main thread */);
1805 0 : if (PREDICT_FALSE (!s))
1806 0 : break;
1807 0 : wrk->ctx.s = s;
1808 0 : if (PREDICT_TRUE (s != 0))
1809 0 : (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1810 0 : break;
1811 0 : default:
1812 0 : clib_warning ("unhandled event type %d", e->event_type);
1813 : }
1814 :
1815 0 : SESSION_EVT (SESSION_EVT_IO_EVT_COUNTS, e->event_type, 1, wrk);
1816 :
1817 : /* Regrab elements in case pool moved */
1818 1582710 : elt = clib_llist_elt (wrk->event_elts, ei);
1819 1582710 : if (!clib_llist_elt_is_linked (elt, evt_list))
1820 209018 : clib_llist_put (wrk->event_elts, elt);
1821 1582710 : }
1822 :
1823 : /* *INDENT-OFF* */
1824 : static const u32 session_evt_msg_sizes[] = {
1825 : #define _(symc, sym) \
1826 : [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1827 : foreach_session_ctrl_evt
1828 : #undef _
1829 : };
1830 : /* *INDENT-ON* */
1831 :
1832 : always_inline void
1833 73706900 : session_update_time_subscribers (session_main_t *smm, clib_time_type_t now,
1834 : u32 thread_index)
1835 : {
1836 : session_update_time_fn *fn;
1837 :
1838 155430000 : vec_foreach (fn, smm->update_time_fns)
1839 81706700 : (*fn) (now, thread_index);
1840 73668200 : }
1841 :
1842 : always_inline void
1843 167705 : session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1844 : {
1845 : session_evt_elt_t *elt;
1846 :
1847 167705 : if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1848 : {
1849 421 : elt = session_evt_alloc_ctrl (wrk);
1850 421 : if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1851 : {
1852 275 : elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1853 275 : elt->evt.event_type = evt->event_type;
1854 275 : clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1855 275 : session_evt_msg_sizes[evt->event_type]);
1856 : }
1857 : else
1858 : {
1859 : /* Internal control events fit into io events footprint */
1860 146 : clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1861 : }
1862 : }
1863 : else
1864 : {
1865 167284 : elt = session_evt_alloc_new (wrk);
1866 167284 : clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1867 : }
1868 167705 : }
1869 :
1870 : static void
1871 48962 : session_flush_pending_tx_buffers (session_worker_t * wrk,
1872 : vlib_node_runtime_t * node)
1873 : {
1874 48962 : vlib_buffer_enqueue_to_next_vec (wrk->vm, node, &wrk->pending_tx_buffers,
1875 : &wrk->pending_tx_nexts,
1876 48962 : vec_len (wrk->pending_tx_nexts));
1877 48962 : vec_reset_length (wrk->pending_tx_buffers);
1878 48962 : vec_reset_length (wrk->pending_tx_nexts);
1879 48962 : }
1880 :
1881 : int
1882 73662500 : session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1883 : {
1884 73662500 : svm_msg_q_msg_t _msg, *msg = &_msg;
1885 73662500 : u32 i, n_to_dequeue = 0;
1886 : session_event_t *evt;
1887 :
1888 73662500 : n_to_dequeue = svm_msg_q_size (mq);
1889 73822300 : for (i = 0; i < n_to_dequeue; i++)
1890 : {
1891 167705 : svm_msg_q_sub_raw (mq, msg);
1892 167705 : evt = svm_msg_q_msg_data (mq, msg);
1893 167705 : session_evt_add_to_list (wrk, evt);
1894 167705 : svm_msg_q_free_msg (mq, msg);
1895 : }
1896 :
1897 73654600 : return n_to_dequeue;
1898 : }
1899 :
1900 : static void
1901 589313 : session_wrk_update_state (session_worker_t *wrk)
1902 : {
1903 589313 : vlib_main_t *vm = wrk->vm;
1904 :
1905 589313 : if (wrk->state == SESSION_WRK_POLLING)
1906 : {
1907 584276 : if (clib_llist_elts (wrk->event_elts) == 5 &&
1908 11 : vlib_last_vectors_per_main_loop (vm) < 1)
1909 : {
1910 11 : session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1911 11 : vlib_node_set_state (vm, session_queue_node.index,
1912 : VLIB_NODE_STATE_INTERRUPT);
1913 : }
1914 : }
1915 5048 : else if (wrk->state == SESSION_WRK_INTERRUPT)
1916 : {
1917 10079 : if (clib_llist_elts (wrk->event_elts) > 5 ||
1918 5035 : vlib_last_vectors_per_main_loop (vm) > 1)
1919 : {
1920 9 : session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1921 9 : vlib_node_set_state (vm, session_queue_node.index,
1922 : VLIB_NODE_STATE_POLLING);
1923 : }
1924 5035 : else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1925 : {
1926 4 : session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1927 : }
1928 : }
1929 : else
1930 : {
1931 4 : if (clib_llist_elts (wrk->event_elts))
1932 : {
1933 4 : session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1934 : }
1935 : }
1936 589313 : }
1937 :
1938 : static uword
1939 73709400 : session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1940 : vlib_frame_t * frame)
1941 : {
1942 73709400 : u32 thread_index = vm->thread_index, __clib_unused n_evts;
1943 : session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1944 73709400 : session_main_t *smm = vnet_get_session_main ();
1945 73699500 : session_worker_t *wrk = &smm->wrk[thread_index];
1946 : clib_llist_index_t ei, next_ei, old_ti;
1947 : int n_tx_packets;
1948 :
1949 : SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1950 :
1951 73699500 : session_wrk_update_time (wrk, vlib_time_now (vm));
1952 :
1953 : /*
1954 : * Update transport time
1955 : */
1956 73720900 : session_update_time_subscribers (smm, wrk->last_vlib_time, thread_index);
1957 73754100 : n_tx_packets = vec_len (wrk->pending_tx_buffers);
1958 : SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1959 :
1960 73754100 : if (PREDICT_FALSE (wrk->dma_enabled))
1961 : {
1962 0 : if (wrk->trans_head == ((wrk->trans_tail + 1) & (wrk->trans_size - 1)))
1963 0 : return 0;
1964 0 : wrk->batch = vlib_dma_batch_new (vm, wrk->config_index);
1965 0 : if (!wrk->batch)
1966 0 : return 0;
1967 : }
1968 :
1969 : /*
1970 : * Dequeue new internal mq events
1971 : */
1972 :
1973 73754100 : n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1974 : SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1975 :
1976 : /*
1977 : * Handle control events
1978 : */
1979 :
1980 73653900 : ei = wrk->ctrl_head;
1981 73653900 : ctrl_he = clib_llist_elt (wrk->event_elts, ei);
1982 73619200 : next_ei = clib_llist_next_index (ctrl_he, evt_list);
1983 73619200 : old_ti = clib_llist_prev_index (ctrl_he, evt_list);
1984 73616600 : while (ei != old_ti)
1985 : {
1986 928 : ei = next_ei;
1987 928 : elt = clib_llist_elt (wrk->event_elts, next_ei);
1988 928 : next_ei = clib_llist_next_index (elt, evt_list);
1989 928 : clib_llist_remove (wrk->event_elts, evt_list, elt);
1990 928 : session_event_dispatch_ctrl (wrk, elt);
1991 : }
1992 :
1993 : SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1994 :
1995 : /*
1996 : * Handle the new io events.
1997 : */
1998 :
1999 73615600 : new_he = clib_llist_elt (wrk->event_elts, wrk->new_head);
2000 73593600 : old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
2001 73592900 : old_ti = clib_llist_prev_index (old_he, evt_list);
2002 :
2003 73592900 : ei = clib_llist_next_index (new_he, evt_list);
2004 73801900 : while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
2005 : {
2006 209018 : elt = clib_llist_elt (wrk->event_elts, ei);
2007 209018 : ei = clib_llist_next_index (elt, evt_list);
2008 209018 : clib_llist_remove (wrk->event_elts, evt_list, elt);
2009 209018 : session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
2010 : }
2011 :
2012 : SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
2013 :
2014 : /*
2015 : * Handle the old io events, if we had any prior to processing the new ones
2016 : */
2017 :
2018 73592900 : if (old_ti != wrk->old_head)
2019 : {
2020 1346170 : old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
2021 1346170 : ei = clib_llist_next_index (old_he, evt_list);
2022 :
2023 1374170 : while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
2024 : {
2025 1373690 : elt = clib_llist_elt (wrk->event_elts, ei);
2026 1373690 : next_ei = clib_llist_next_index (elt, evt_list);
2027 1373690 : clib_llist_remove (wrk->event_elts, evt_list, elt);
2028 :
2029 1373690 : session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
2030 :
2031 1373690 : if (ei == old_ti)
2032 1345700 : break;
2033 :
2034 27997 : ei = next_ei;
2035 : };
2036 : }
2037 :
2038 73592900 : if (PREDICT_FALSE (wrk->dma_enabled))
2039 : {
2040 0 : if (wrk->batch_num)
2041 : {
2042 0 : vlib_dma_batch_set_cookie (vm, wrk->batch, wrk->trans_tail);
2043 0 : wrk->batch_num = 0;
2044 0 : wrk->trans_tail++;
2045 0 : if (wrk->trans_tail == wrk->trans_size)
2046 0 : wrk->trans_tail = 0;
2047 : }
2048 :
2049 0 : vlib_dma_batch_submit (vm, wrk->batch);
2050 : }
2051 :
2052 : SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
2053 :
2054 73592900 : if (vec_len (wrk->pending_tx_buffers))
2055 48962 : session_flush_pending_tx_buffers (wrk, node);
2056 :
2057 73592900 : vlib_node_increment_counter (vm, session_queue_node.index,
2058 : SESSION_QUEUE_ERROR_TX, n_tx_packets);
2059 :
2060 : SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
2061 :
2062 73606700 : if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
2063 589313 : session_wrk_update_state (wrk);
2064 :
2065 73606900 : return n_tx_packets;
2066 : }
2067 :
2068 : /* *INDENT-OFF* */
2069 183788 : VLIB_REGISTER_NODE (session_queue_node) = {
2070 : .function = session_queue_node_fn,
2071 : .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
2072 : .name = "session-queue",
2073 : .format_trace = format_session_queue_trace,
2074 : .type = VLIB_NODE_TYPE_INPUT,
2075 : .n_errors = SESSION_QUEUE_N_ERROR,
2076 : .error_counters = session_error_counters,
2077 : .state = VLIB_NODE_STATE_DISABLED,
2078 : };
2079 : /* *INDENT-ON* */
2080 :
2081 : static clib_error_t *
2082 4996 : session_wrk_tfd_read_ready (clib_file_t *cf)
2083 : {
2084 4996 : session_worker_t *wrk = session_main_get_worker (cf->private_data);
2085 : u64 buf;
2086 : int rv;
2087 :
2088 4996 : vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
2089 4996 : rv = read (wrk->timerfd, &buf, sizeof (buf));
2090 4996 : if (rv < 0 && errno != EAGAIN)
2091 0 : clib_unix_warning ("failed");
2092 4996 : return 0;
2093 : }
2094 :
2095 : static clib_error_t *
2096 0 : session_wrk_tfd_write_ready (clib_file_t *cf)
2097 : {
2098 0 : return 0;
2099 : }
2100 :
2101 : void
2102 2 : session_wrk_enable_adaptive_mode (session_worker_t *wrk)
2103 : {
2104 2 : u32 thread_index = wrk->vm->thread_index;
2105 2 : clib_file_t template = { 0 };
2106 :
2107 2 : if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
2108 0 : clib_warning ("timerfd_create");
2109 :
2110 2 : template.read_function = session_wrk_tfd_read_ready;
2111 2 : template.write_function = session_wrk_tfd_write_ready;
2112 2 : template.file_descriptor = wrk->timerfd;
2113 2 : template.private_data = thread_index;
2114 2 : template.polling_thread_index = thread_index;
2115 2 : template.description = format (0, "session-wrk-tfd-%u", thread_index);
2116 :
2117 2 : wrk->timerfd_file = clib_file_add (&file_main, &template);
2118 2 : wrk->flags |= SESSION_WRK_F_ADAPTIVE;
2119 2 : }
2120 :
2121 : static clib_error_t *
2122 575 : session_queue_exit (vlib_main_t * vm)
2123 : {
2124 575 : if (vlib_get_n_threads () < 2)
2125 538 : return 0;
2126 :
2127 : /*
2128 : * Shut off (especially) worker-thread session nodes.
2129 : * Otherwise, vpp can crash as the main thread unmaps the
2130 : * API segment.
2131 : */
2132 37 : vlib_worker_thread_barrier_sync (vm);
2133 37 : session_node_enable_disable (0 /* is_enable */ );
2134 37 : vlib_worker_thread_barrier_release (vm);
2135 37 : return 0;
2136 : }
2137 :
2138 2301 : VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
2139 :
2140 : static uword
2141 12 : session_queue_run_on_main (vlib_main_t * vm)
2142 : {
2143 : vlib_node_runtime_t *node;
2144 :
2145 12 : node = vlib_node_get_runtime (vm, session_queue_node.index);
2146 12 : return session_queue_node_fn (vm, node, 0);
2147 : }
2148 :
2149 : static uword
2150 21 : session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
2151 : vlib_frame_t * f)
2152 : {
2153 21 : uword *event_data = 0;
2154 21 : f64 timeout = 1.0;
2155 : uword event_type;
2156 :
2157 : while (1)
2158 : {
2159 33 : vlib_process_wait_for_event_or_clock (vm, timeout);
2160 12 : event_type = vlib_process_get_events (vm, (uword **) & event_data);
2161 :
2162 12 : switch (event_type)
2163 : {
2164 0 : case SESSION_Q_PROCESS_RUN_ON_MAIN:
2165 : /* Run session queue node on main thread */
2166 0 : session_queue_run_on_main (vm);
2167 0 : break;
2168 0 : case SESSION_Q_PROCESS_STOP:
2169 0 : vlib_node_set_state (vm, session_queue_process_node.index,
2170 : VLIB_NODE_STATE_DISABLED);
2171 0 : timeout = 100000.0;
2172 0 : break;
2173 12 : case ~0:
2174 : /* Timed out. Run on main to ensure all events are handled */
2175 12 : session_queue_run_on_main (vm);
2176 12 : break;
2177 : }
2178 12 : vec_reset_length (event_data);
2179 : }
2180 : return 0;
2181 : }
2182 :
2183 : /* *INDENT-OFF* */
2184 183788 : VLIB_REGISTER_NODE (session_queue_process_node) =
2185 : {
2186 : .function = session_queue_process,
2187 : .type = VLIB_NODE_TYPE_PROCESS,
2188 : .name = "session-queue-process",
2189 : .state = VLIB_NODE_STATE_DISABLED,
2190 : };
2191 : /* *INDENT-ON* */
2192 :
2193 : static_always_inline uword
2194 0 : session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
2195 : vlib_frame_t * frame)
2196 : {
2197 0 : session_main_t *sm = &session_main;
2198 0 : if (!sm->wrk[0].vpp_event_queue)
2199 0 : return 0;
2200 0 : node = vlib_node_get_runtime (vm, session_queue_node.index);
2201 0 : return session_queue_node_fn (vm, node, frame);
2202 : }
2203 :
2204 : /* *INDENT-OFF* */
2205 183788 : VLIB_REGISTER_NODE (session_queue_pre_input_node) =
2206 : {
2207 : .function = session_queue_pre_input_inline,
2208 : .type = VLIB_NODE_TYPE_PRE_INPUT,
2209 : .name = "session-queue-main",
2210 : .state = VLIB_NODE_STATE_DISABLED,
2211 : };
2212 : /* *INDENT-ON* */
2213 :
2214 : /*
2215 : * fd.io coding-style-patch-verification: ON
2216 : *
2217 : * Local Variables:
2218 : * eval: (c-set-style "gnu")
2219 : * End:
2220 : */
|