Line data Source code
1 : /*
2 : * Copyright (c) 2019 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 :
16 : #include <vnet/session/application.h>
17 : #include <vnet/session/application_interface.h>
18 : #include <vnet/session/session.h>
19 :
20 : /**
21 : * Pool of workers associated to apps
22 : */
23 : static app_worker_t *app_workers;
24 :
25 : app_worker_t *
26 216 : app_worker_alloc (application_t * app)
27 : {
28 : app_worker_t *app_wrk;
29 216 : pool_get (app_workers, app_wrk);
30 216 : clib_memset (app_wrk, 0, sizeof (*app_wrk));
31 216 : app_wrk->wrk_index = app_wrk - app_workers;
32 216 : app_wrk->app_index = app->app_index;
33 216 : app_wrk->wrk_map_index = ~0;
34 216 : app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
35 216 : clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
36 216 : clib_spinlock_init (&app_wrk->postponed_mq_msgs_lock);
37 : APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
38 216 : return app_wrk;
39 : }
40 :
41 : app_worker_t *
42 13659 : app_worker_get (u32 wrk_index)
43 : {
44 13659 : return pool_elt_at_index (app_workers, wrk_index);
45 : }
46 :
47 : app_worker_t *
48 252242 : app_worker_get_if_valid (u32 wrk_index)
49 : {
50 252242 : if (pool_is_free_index (app_workers, wrk_index))
51 185 : return 0;
52 252057 : return pool_elt_at_index (app_workers, wrk_index);
53 : }
54 :
55 : void
56 87 : app_worker_free (app_worker_t * app_wrk)
57 : {
58 87 : application_t *app = application_get (app_wrk->app_index);
59 87 : vnet_unlisten_args_t _a, *a = &_a;
60 87 : u64 handle, *handles = 0, *sm_indices = 0;
61 : segment_manager_t *sm;
62 : session_handle_t *sh;
63 : session_t *ls;
64 : u32 sm_index;
65 : int i;
66 :
67 : /*
68 : * Listener cleanup
69 : */
70 :
71 : /* *INDENT-OFF* */
72 5680 : hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
73 : ls = listen_session_get_from_handle (handle);
74 : vec_add1 (handles, app_listen_session_handle (ls));
75 : vec_add1 (sm_indices, sm_index);
76 : sm = segment_manager_get (sm_index);
77 : }));
78 : /* *INDENT-ON* */
79 :
80 112 : for (i = 0; i < vec_len (handles); i++)
81 : {
82 : /* Cleanup listener */
83 25 : a->app_index = app->app_index;
84 25 : a->wrk_map_index = app_wrk->wrk_map_index;
85 25 : a->handle = handles[i];
86 25 : (void) vnet_unlisten (a);
87 :
88 25 : sm = segment_manager_get_if_valid (sm_indices[i]);
89 25 : if (sm && !segment_manager_app_detached (sm))
90 : {
91 0 : sm->first_is_protected = 0;
92 0 : segment_manager_init_free (sm);
93 : }
94 : }
95 87 : vec_reset_length (handles);
96 87 : vec_free (sm_indices);
97 87 : hash_free (app_wrk->listeners_table);
98 :
99 : /*
100 : * Connects segment manager cleanup
101 : */
102 :
103 87 : if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
104 : {
105 87 : sm = segment_manager_get (app_wrk->connects_seg_manager);
106 87 : sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
107 87 : sm->first_is_protected = 0;
108 87 : segment_manager_init_free (sm);
109 : }
110 :
111 : /*
112 : * Half-open cleanup
113 : */
114 :
115 88 : pool_foreach (sh, app_wrk->half_open_table)
116 1 : session_cleanup_half_open (*sh);
117 :
118 87 : pool_free (app_wrk->half_open_table);
119 :
120 : /*
121 : * Detached listener segment managers cleanup
122 : */
123 112 : for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
124 : {
125 25 : sm = segment_manager_get (app_wrk->detached_seg_managers[i]);
126 25 : segment_manager_init_free (sm);
127 : }
128 87 : vec_free (app_wrk->detached_seg_managers);
129 87 : clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
130 87 : clib_spinlock_free (&app_wrk->postponed_mq_msgs_lock);
131 :
132 : if (CLIB_DEBUG)
133 87 : clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
134 87 : pool_put (app_workers, app_wrk);
135 87 : }
136 :
137 : application_t *
138 0 : app_worker_get_app (u32 wrk_index)
139 : {
140 : app_worker_t *app_wrk;
141 0 : app_wrk = app_worker_get_if_valid (wrk_index);
142 0 : if (!app_wrk)
143 0 : return 0;
144 0 : return application_get_if_valid (app_wrk->app_index);
145 : }
146 :
147 : static segment_manager_t *
148 78 : app_worker_alloc_segment_manager (app_worker_t * app_wrk)
149 : {
150 : segment_manager_t *sm;
151 :
152 78 : sm = segment_manager_alloc ();
153 78 : sm->app_wrk_index = app_wrk->wrk_index;
154 78 : segment_manager_init (sm);
155 78 : return sm;
156 : }
157 :
158 : static int
159 404 : app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
160 : {
161 404 : svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
162 : int rv;
163 :
164 404 : if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
165 : &rx_fifo, &tx_fifo)))
166 0 : return rv;
167 :
168 404 : rx_fifo->shr->master_session_index = s->session_index;
169 404 : rx_fifo->master_thread_index = s->thread_index;
170 :
171 404 : tx_fifo->shr->master_session_index = s->session_index;
172 404 : tx_fifo->master_thread_index = s->thread_index;
173 :
174 404 : s->rx_fifo = rx_fifo;
175 404 : s->tx_fifo = tx_fifo;
176 404 : return 0;
177 : }
178 :
179 : int
180 78 : app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
181 : {
182 : segment_manager_t *sm;
183 :
184 : /* Allocate segment manager. All sessions derived out of a listen session
185 : * have fifos allocated by the same segment manager. */
186 78 : if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
187 0 : return SESSION_E_ALLOC;
188 :
189 : /* Once the first segment is mapped, don't remove it until unlisten */
190 78 : sm->first_is_protected = 1;
191 :
192 : /* Keep track of the segment manager for the listener or this worker */
193 78 : hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
194 : segment_manager_index (sm));
195 :
196 78 : if (transport_connection_is_cless (session_get_transport (ls)))
197 : {
198 3 : if (ls->rx_fifo)
199 0 : return SESSION_E_NOSUPPORT;
200 3 : return app_worker_alloc_session_fifos (sm, ls);
201 : }
202 75 : return 0;
203 : }
204 :
205 : int
206 75 : app_worker_start_listen (app_worker_t * app_wrk,
207 : app_listener_t * app_listener)
208 : {
209 : session_t *ls;
210 : int rv;
211 :
212 75 : if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
213 2 : return SESSION_E_ALREADY_LISTENING;
214 :
215 146 : app_listener->workers = clib_bitmap_set (app_listener->workers,
216 73 : app_wrk->wrk_map_index, 1);
217 :
218 73 : if (app_listener->session_index != SESSION_INVALID_INDEX)
219 : {
220 60 : ls = session_get (app_listener->session_index, 0);
221 60 : if ((rv = app_worker_init_listener (app_wrk, ls)))
222 0 : return rv;
223 : }
224 :
225 73 : if (app_listener->local_index != SESSION_INVALID_INDEX)
226 : {
227 18 : ls = session_get (app_listener->local_index, 0);
228 18 : if ((rv = app_worker_init_listener (app_wrk, ls)))
229 0 : return rv;
230 : }
231 :
232 73 : return 0;
233 : }
234 :
235 : static void
236 31 : app_worker_add_detached_sm (app_worker_t * app_wrk, u32 sm_index)
237 : {
238 31 : vec_add1 (app_wrk->detached_seg_managers, sm_index);
239 31 : }
240 :
241 : void
242 6 : app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index)
243 : {
244 : u32 i;
245 :
246 6 : clib_spinlock_lock (&app_wrk->detached_seg_managers_lock);
247 6 : for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
248 : {
249 6 : if (app_wrk->detached_seg_managers[i] == sm_index)
250 : {
251 6 : vec_del1 (app_wrk->detached_seg_managers, i);
252 6 : break;
253 : }
254 : }
255 6 : clib_spinlock_unlock (&app_wrk->detached_seg_managers_lock);
256 6 : }
257 :
258 : static void
259 62 : app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
260 : {
261 : session_handle_t handle;
262 : segment_manager_t *sm;
263 : uword *sm_indexp;
264 62 : session_state_t *states = 0;
265 :
266 62 : handle = listen_session_get_handle (ls);
267 62 : sm_indexp = hash_get (app_wrk->listeners_table, handle);
268 62 : if (PREDICT_FALSE (!sm_indexp))
269 0 : return;
270 :
271 : /* Dealloc fifos, if any (dgram listeners) */
272 62 : if (ls->rx_fifo)
273 : {
274 3 : segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo);
275 3 : ls->tx_fifo = ls->rx_fifo = 0;
276 : }
277 :
278 : /* Try to cleanup segment manager */
279 62 : sm = segment_manager_get (*sm_indexp);
280 62 : if (sm)
281 : {
282 62 : sm->first_is_protected = 0;
283 62 : segment_manager_app_detach (sm);
284 62 : if (!segment_manager_has_fifos (sm))
285 : {
286 : /* Empty segment manager, cleanup it up */
287 31 : segment_manager_free (sm);
288 : }
289 : else
290 : {
291 : /* Delete sessions in CREATED state */
292 31 : vec_add1 (states, SESSION_STATE_CREATED);
293 31 : segment_manager_del_sessions_filter (sm, states);
294 31 : vec_free (states);
295 :
296 : /* Track segment manager in case app detaches and all the
297 : * outstanding sessions need to be closed */
298 31 : app_worker_add_detached_sm (app_wrk, *sm_indexp);
299 31 : sm->flags |= SEG_MANAGER_F_DETACHED_LISTENER;
300 : }
301 : }
302 :
303 62 : hash_unset (app_wrk->listeners_table, handle);
304 : }
305 :
306 : int
307 57 : app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
308 : {
309 : session_t *ls;
310 :
311 57 : if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
312 0 : return 0;
313 :
314 57 : if (al->session_index != SESSION_INVALID_INDEX)
315 : {
316 44 : ls = listen_session_get (al->session_index);
317 44 : app_worker_stop_listen_session (app_wrk, ls);
318 : }
319 :
320 57 : if (al->local_index != SESSION_INVALID_INDEX)
321 : {
322 18 : ls = listen_session_get (al->local_index);
323 18 : app_worker_stop_listen_session (app_wrk, ls);
324 : }
325 :
326 57 : clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
327 57 : if (clib_bitmap_is_zero (al->workers))
328 57 : app_listener_cleanup (al);
329 :
330 57 : return 0;
331 : }
332 :
333 : int
334 176 : app_worker_init_accepted (session_t * s)
335 : {
336 : app_worker_t *app_wrk;
337 : segment_manager_t *sm;
338 : session_t *listener;
339 : application_t *app;
340 :
341 176 : listener = listen_session_get_from_handle (s->listener_handle);
342 176 : app_wrk = application_listener_select_worker (listener);
343 176 : if (PREDICT_FALSE (app_wrk->mq_congested))
344 0 : return -1;
345 :
346 176 : s->app_wrk_index = app_wrk->wrk_index;
347 176 : app = application_get (app_wrk->app_index);
348 176 : if (app->cb_fns.fifo_tuning_callback)
349 0 : s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
350 :
351 176 : sm = app_worker_get_listen_segment_manager (app_wrk, listener);
352 176 : if (app_worker_alloc_session_fifos (sm, s))
353 0 : return -1;
354 :
355 176 : return 0;
356 : }
357 :
358 : int
359 204 : app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
360 : {
361 204 : application_t *app = application_get (app_wrk->app_index);
362 204 : return app->cb_fns.session_accept_callback (s);
363 : }
364 :
365 : int
366 225 : app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
367 : {
368 225 : application_t *app = application_get (app_wrk->app_index);
369 : segment_manager_t *sm;
370 :
371 225 : if (app->cb_fns.fifo_tuning_callback)
372 0 : s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
373 :
374 : /* Allocate fifos for session, unless the app is a builtin proxy */
375 225 : if (application_is_builtin_proxy (app))
376 0 : return 0;
377 :
378 225 : sm = app_worker_get_connect_segment_manager (app_wrk);
379 225 : return app_worker_alloc_session_fifos (sm, s);
380 : }
381 :
382 : int
383 207 : app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
384 : session_error_t err, u32 opaque)
385 : {
386 207 : application_t *app = application_get (app_wrk->app_index);
387 207 : return app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
388 : s, err);
389 : }
390 :
391 : int
392 148 : app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
393 : {
394 : session_handle_t *shp;
395 :
396 148 : ASSERT (session_vlib_thread_is_cl_thread ());
397 148 : pool_get (app_wrk->half_open_table, shp);
398 148 : *shp = sh;
399 :
400 148 : return (shp - app_wrk->half_open_table);
401 : }
402 :
403 : int
404 147 : app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
405 : {
406 147 : application_t *app = application_get (app_wrk->app_index);
407 147 : ASSERT (session_vlib_thread_is_cl_thread ());
408 147 : pool_put_index (app_wrk->half_open_table, s->ho_index);
409 147 : if (app->cb_fns.half_open_cleanup_callback)
410 2 : app->cb_fns.half_open_cleanup_callback (s);
411 147 : return 0;
412 : }
413 :
414 : int
415 152 : app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
416 : {
417 152 : application_t *app = application_get (app_wrk->app_index);
418 152 : app->cb_fns.session_disconnect_callback (s);
419 152 : return 0;
420 : }
421 :
422 : int
423 264 : app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
424 : {
425 264 : application_t *app = application_get (app_wrk->app_index);
426 264 : if (app->cb_fns.session_transport_closed_callback)
427 0 : app->cb_fns.session_transport_closed_callback (s);
428 264 : return 0;
429 : }
430 :
431 : int
432 5 : app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
433 : {
434 5 : application_t *app = application_get (app_wrk->app_index);
435 5 : app->cb_fns.session_reset_callback (s);
436 5 : return 0;
437 : }
438 :
439 : int
440 420 : app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
441 : session_cleanup_ntf_t ntf)
442 : {
443 420 : application_t *app = application_get (app_wrk->app_index);
444 420 : if (app->cb_fns.session_cleanup_callback)
445 154 : app->cb_fns.session_cleanup_callback (s, ntf);
446 420 : return 0;
447 : }
448 :
449 : int
450 79264 : app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s)
451 : {
452 79264 : application_t *app = application_get (app_wrk->app_index);
453 79264 : app->cb_fns.builtin_app_rx_callback (s);
454 79264 : return 0;
455 : }
456 :
457 : int
458 134 : app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s)
459 : {
460 134 : application_t *app = application_get (app_wrk->app_index);
461 :
462 134 : if (!app->cb_fns.builtin_app_tx_callback)
463 0 : return 0;
464 :
465 134 : app->cb_fns.builtin_app_tx_callback (s);
466 134 : return 0;
467 : }
468 :
469 : int
470 0 : app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
471 : session_handle_t new_sh)
472 : {
473 0 : application_t *app = application_get (app_wrk->app_index);
474 0 : app->cb_fns.session_migrate_callback (s, new_sh);
475 0 : return 0;
476 : }
477 :
478 : int
479 0 : app_worker_own_session (app_worker_t * app_wrk, session_t * s)
480 : {
481 : segment_manager_t *sm;
482 : svm_fifo_t *rxf, *txf;
483 : int rv;
484 :
485 0 : if (s->session_state == SESSION_STATE_LISTENING)
486 0 : return application_change_listener_owner (s, app_wrk);
487 :
488 0 : s->app_wrk_index = app_wrk->wrk_index;
489 :
490 0 : rxf = s->rx_fifo;
491 0 : txf = s->tx_fifo;
492 :
493 0 : if (!rxf || !txf)
494 0 : return 0;
495 :
496 0 : s->rx_fifo = 0;
497 0 : s->tx_fifo = 0;
498 :
499 0 : sm = app_worker_get_connect_segment_manager (app_wrk);
500 0 : if ((rv = app_worker_alloc_session_fifos (sm, s)))
501 0 : return rv;
502 :
503 0 : if (!svm_fifo_is_empty_cons (rxf))
504 0 : svm_fifo_clone (s->rx_fifo, rxf);
505 :
506 0 : if (!svm_fifo_is_empty_cons (txf))
507 0 : svm_fifo_clone (s->tx_fifo, txf);
508 :
509 0 : segment_manager_dealloc_fifos (rxf, txf);
510 :
511 0 : return 0;
512 : }
513 :
514 : int
515 213 : app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
516 : session_handle_t *rsh)
517 : {
518 213 : if (PREDICT_FALSE (app_wrk->mq_congested))
519 0 : return SESSION_E_REFUSED;
520 :
521 213 : sep->app_wrk_index = app_wrk->wrk_index;
522 :
523 213 : return session_open (sep, rsh);
524 : }
525 :
526 : int
527 0 : app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
528 : svm_fifo_t * f,
529 : session_ft_action_t act, u32 len)
530 : {
531 0 : application_t *app = application_get (app_wrk->app_index);
532 0 : return app->cb_fns.fifo_tuning_callback (s, f, act, len);
533 : }
534 :
535 : segment_manager_t *
536 230 : app_worker_get_connect_segment_manager (app_worker_t * app)
537 : {
538 230 : ASSERT (app->connects_seg_manager != (u32) ~ 0);
539 230 : return segment_manager_get (app->connects_seg_manager);
540 : }
541 :
542 : segment_manager_t *
543 189 : app_worker_get_listen_segment_manager (app_worker_t * app,
544 : session_t * listener)
545 : {
546 : uword *smp;
547 189 : smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
548 189 : ALWAYS_ASSERT (smp != 0);
549 189 : return segment_manager_get (*smp);
550 : }
551 :
552 : session_t *
553 8 : app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
554 : u8 transport_proto)
555 : {
556 : session_t *listener;
557 : u64 handle;
558 : u32 sm_index;
559 : u8 sst;
560 :
561 8 : sst = session_type_from_proto_and_ip (transport_proto,
562 : fib_proto == FIB_PROTOCOL_IP4);
563 :
564 : /* *INDENT-OFF* */
565 353 : hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
566 : listener = listen_session_get_from_handle (handle);
567 : if (listener->session_type == sst
568 : && !(listener->flags & SESSION_F_PROXY))
569 : return listener;
570 : }));
571 : /* *INDENT-ON* */
572 :
573 2 : return 0;
574 : }
575 :
576 : session_t *
577 2 : app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
578 : u8 transport_proto)
579 : {
580 : session_t *listener;
581 : u64 handle;
582 : u32 sm_index;
583 : u8 sst;
584 :
585 2 : sst = session_type_from_proto_and_ip (transport_proto,
586 : fib_proto == FIB_PROTOCOL_IP4);
587 :
588 : /* *INDENT-OFF* */
589 41 : hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
590 : listener = listen_session_get_from_handle (handle);
591 : if (listener->session_type == sst && (listener->flags & SESSION_F_PROXY))
592 : return listener;
593 : }));
594 : /* *INDENT-ON* */
595 :
596 0 : return 0;
597 : }
598 :
599 : /**
600 : * Send an API message to the external app, to map new segment
601 : */
602 : int
603 88 : app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
604 : {
605 88 : application_t *app = application_get (app_wrk->app_index);
606 :
607 88 : return app->cb_fns.add_segment_callback (app_wrk->wrk_index,
608 : segment_handle);
609 : }
610 :
611 : int
612 6 : app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
613 : {
614 6 : application_t *app = application_get (app_wrk->app_index);
615 6 : return app->cb_fns.del_segment_callback (app_wrk->wrk_index,
616 : segment_handle);
617 : }
618 :
619 : static inline u8
620 251906 : app_worker_application_is_builtin (app_worker_t * app_wrk)
621 : {
622 251906 : return app_wrk->app_is_builtin;
623 : }
624 :
625 : static int
626 40 : app_wrk_send_fd (app_worker_t *app_wrk, int fd)
627 : {
628 40 : if (!appns_sapi_enabled ())
629 : {
630 : vl_api_registration_t *reg;
631 : clib_error_t *error;
632 :
633 : reg =
634 31 : vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
635 31 : if (!reg)
636 : {
637 0 : clib_warning ("no api registration for client: %u",
638 : app_wrk->api_client_index);
639 0 : return -1;
640 : }
641 :
642 31 : if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
643 0 : return -1;
644 :
645 31 : error = vl_api_send_fd_msg (reg, &fd, 1);
646 31 : if (error)
647 : {
648 0 : clib_error_report (error);
649 0 : return -1;
650 : }
651 :
652 31 : return 0;
653 : }
654 :
655 9 : app_sapi_msg_t smsg = { 0 };
656 : app_namespace_t *app_ns;
657 : clib_error_t *error;
658 : application_t *app;
659 : clib_socket_t *cs;
660 : u32 cs_index;
661 :
662 9 : app = application_get (app_wrk->app_index);
663 9 : app_ns = app_namespace_get (app->ns_index);
664 9 : cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
665 9 : cs = appns_sapi_get_socket (app_ns, cs_index);
666 9 : if (PREDICT_FALSE (!cs))
667 0 : return -1;
668 :
669 : /* There's no payload for the message only the type */
670 9 : smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
671 9 : error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
672 9 : if (error)
673 : {
674 0 : clib_error_report (error);
675 0 : return -1;
676 : }
677 :
678 9 : return 0;
679 : }
680 :
681 : static int
682 116667 : mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
683 : svm_msg_q_msg_t *msg)
684 : {
685 116667 : int rv, n_try = 0;
686 :
687 116720 : while (n_try < 75)
688 : {
689 116720 : rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
690 116720 : if (!rv)
691 116667 : return 0;
692 : /*
693 : * Break the loop if mq is full, usually this is because the
694 : * app has crashed or is hanging on somewhere.
695 : */
696 53 : if (rv != -1)
697 0 : break;
698 53 : n_try += 1;
699 53 : usleep (1);
700 : }
701 :
702 0 : return -1;
703 : }
704 :
705 : typedef union app_wrk_mq_rpc_args_
706 : {
707 : struct
708 : {
709 : u32 thread_index;
710 : u32 app_wrk_index;
711 : };
712 : uword as_uword;
713 : } app_wrk_mq_rpc_ags_t;
714 :
715 : static int
716 0 : app_wrk_handle_mq_postponed_msgs (void *arg)
717 : {
718 0 : svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
719 : app_wrk_postponed_msg_t *pm;
720 : app_wrk_mq_rpc_ags_t args;
721 0 : u32 max_msg, n_msg = 0;
722 : app_worker_t *app_wrk;
723 : session_event_t *evt;
724 : svm_msg_q_t *mq;
725 :
726 0 : args.as_uword = pointer_to_uword (arg);
727 0 : app_wrk = app_worker_get_if_valid (args.app_wrk_index);
728 0 : if (!app_wrk)
729 0 : return 0;
730 :
731 0 : mq = app_wrk->event_queue;
732 :
733 0 : clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
734 :
735 0 : max_msg = clib_min (32, clib_fifo_elts (app_wrk->postponed_mq_msgs));
736 :
737 0 : while (n_msg < max_msg)
738 : {
739 0 : pm = clib_fifo_head (app_wrk->postponed_mq_msgs);
740 0 : if (mq_try_lock_and_alloc_msg (mq, pm->ring, mq_msg))
741 0 : break;
742 :
743 0 : evt = svm_msg_q_msg_data (mq, mq_msg);
744 0 : clib_memset (evt, 0, sizeof (*evt));
745 0 : evt->event_type = pm->event_type;
746 0 : clib_memcpy_fast (evt->data, pm->data, pm->len);
747 :
748 0 : if (pm->fd != -1)
749 0 : app_wrk_send_fd (app_wrk, pm->fd);
750 :
751 0 : svm_msg_q_add_and_unlock (mq, mq_msg);
752 :
753 0 : clib_fifo_advance_head (app_wrk->postponed_mq_msgs, 1);
754 0 : n_msg += 1;
755 : }
756 :
757 0 : if (!clib_fifo_elts (app_wrk->postponed_mq_msgs))
758 : {
759 0 : app_wrk->mq_congested = 0;
760 : }
761 : else
762 : {
763 0 : session_send_rpc_evt_to_thread_force (
764 : args.thread_index, app_wrk_handle_mq_postponed_msgs,
765 0 : uword_to_pointer (args.as_uword, void *));
766 : }
767 :
768 0 : clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
769 :
770 0 : return 0;
771 : }
772 :
773 : static void
774 0 : app_wrk_add_mq_postponed_msg (app_worker_t *app_wrk, session_mq_rings_e ring,
775 : u8 evt_type, void *msg, u32 msg_len, int fd)
776 : {
777 : app_wrk_postponed_msg_t *pm;
778 :
779 0 : clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
780 :
781 0 : app_wrk->mq_congested = 1;
782 :
783 0 : clib_fifo_add2 (app_wrk->postponed_mq_msgs, pm);
784 0 : clib_memcpy_fast (pm->data, msg, msg_len);
785 0 : pm->event_type = evt_type;
786 0 : pm->ring = ring;
787 0 : pm->len = msg_len;
788 0 : pm->fd = fd;
789 :
790 0 : if (clib_fifo_elts (app_wrk->postponed_mq_msgs) == 1)
791 : {
792 0 : app_wrk_mq_rpc_ags_t args = { .thread_index = vlib_get_thread_index (),
793 0 : .app_wrk_index = app_wrk->wrk_index };
794 :
795 0 : session_send_rpc_evt_to_thread_force (
796 : args.thread_index, app_wrk_handle_mq_postponed_msgs,
797 0 : uword_to_pointer (args.as_uword, void *));
798 : }
799 :
800 0 : clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
801 0 : }
802 :
803 : always_inline void
804 290 : app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
805 : u32 msg_len, int fd)
806 : {
807 290 : svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
808 290 : svm_msg_q_t *mq = app_wrk->event_queue;
809 : session_event_t *evt;
810 : int rv;
811 :
812 290 : if (PREDICT_FALSE (app_wrk->mq_congested))
813 0 : goto handle_congestion;
814 :
815 290 : rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_CTRL_EVT_RING, mq_msg);
816 290 : if (PREDICT_FALSE (rv))
817 0 : goto handle_congestion;
818 :
819 290 : evt = svm_msg_q_msg_data (mq, mq_msg);
820 290 : clib_memset (evt, 0, sizeof (*evt));
821 290 : evt->event_type = evt_type;
822 290 : clib_memcpy_fast (evt->data, msg, msg_len);
823 :
824 290 : if (fd != -1)
825 40 : app_wrk_send_fd (app_wrk, fd);
826 :
827 290 : svm_msg_q_add_and_unlock (mq, mq_msg);
828 :
829 290 : return;
830 :
831 0 : handle_congestion:
832 :
833 0 : app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_CTRL_EVT_RING, evt_type,
834 : msg, msg_len, fd);
835 : }
836 :
837 : void
838 40 : app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
839 : u32 msg_len, int fd)
840 : {
841 40 : app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
842 40 : }
843 :
844 : void
845 250 : app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
846 : u32 msg_len)
847 : {
848 250 : app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
849 250 : }
850 :
851 : static inline int
852 229503 : app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s)
853 : {
854 229503 : svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
855 : session_event_t *evt;
856 : svm_msg_q_t *mq;
857 : u32 app_session;
858 : int rv;
859 :
860 229503 : if (app_worker_application_is_builtin (app_wrk))
861 69114 : return app_worker_builtin_rx (app_wrk, s);
862 :
863 160389 : if (svm_fifo_has_event (s->rx_fifo))
864 66281 : return 0;
865 :
866 94108 : app_session = s->rx_fifo->shr->client_session_index;
867 94108 : mq = app_wrk->event_queue;
868 :
869 94108 : if (PREDICT_FALSE (app_wrk->mq_congested))
870 0 : goto handle_congestion;
871 :
872 94108 : rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
873 :
874 94108 : if (PREDICT_FALSE (rv))
875 0 : goto handle_congestion;
876 :
877 94108 : evt = svm_msg_q_msg_data (mq, mq_msg);
878 94108 : evt->event_type = SESSION_IO_EVT_RX;
879 94108 : evt->session_index = app_session;
880 :
881 94108 : (void) svm_fifo_set_event (s->rx_fifo);
882 :
883 94108 : svm_msg_q_add_and_unlock (mq, mq_msg);
884 :
885 94108 : return 0;
886 :
887 0 : handle_congestion:
888 :
889 0 : app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
890 : SESSION_IO_EVT_RX, &app_session,
891 : sizeof (app_session), -1);
892 0 : return -1;
893 : }
894 :
895 : static inline int
896 22403 : app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s)
897 : {
898 22403 : svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
899 : session_event_t *evt;
900 : svm_msg_q_t *mq;
901 : u32 app_session;
902 : int rv;
903 :
904 22403 : if (app_worker_application_is_builtin (app_wrk))
905 134 : return app_worker_builtin_tx (app_wrk, s);
906 :
907 22269 : app_session = s->tx_fifo->shr->client_session_index;
908 22269 : mq = app_wrk->event_queue;
909 :
910 22269 : if (PREDICT_FALSE (app_wrk->mq_congested))
911 0 : goto handle_congestion;
912 :
913 22269 : rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
914 :
915 22269 : if (PREDICT_FALSE (rv))
916 0 : goto handle_congestion;
917 :
918 22269 : evt = svm_msg_q_msg_data (mq, mq_msg);
919 22269 : evt->event_type = SESSION_IO_EVT_TX;
920 22269 : evt->session_index = app_session;
921 :
922 22269 : svm_msg_q_add_and_unlock (mq, mq_msg);
923 :
924 22269 : return 0;
925 :
926 0 : handle_congestion:
927 :
928 0 : app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
929 : SESSION_IO_EVT_TX, &app_session,
930 : sizeof (app_session), -1);
931 0 : return -1;
932 : }
933 :
934 : /* *INDENT-OFF* */
935 : typedef int (app_send_evt_handler_fn) (app_worker_t *app,
936 : session_t *s);
937 : static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
938 : app_send_io_evt_rx,
939 : app_send_io_evt_tx,
940 : };
941 : /* *INDENT-ON* */
942 :
943 : /**
944 : * Send event to application
945 : *
946 : * Logic from queue perspective is blocking. However, if queue is full,
947 : * we return.
948 : */
949 : int
950 251906 : app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
951 : u8 evt_type)
952 : {
953 251906 : return app_send_evt_handler_fns[evt_type] (app, s);
954 : }
955 :
956 : u8 *
957 5 : format_app_worker_listener (u8 * s, va_list * args)
958 : {
959 5 : app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
960 5 : u64 handle = va_arg (*args, u64);
961 5 : u32 sm_index = va_arg (*args, u32);
962 5 : int verbose = va_arg (*args, int);
963 : session_t *listener;
964 : const u8 *app_name;
965 : u8 *str;
966 :
967 5 : if (!app_wrk)
968 : {
969 5 : if (verbose)
970 0 : s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s%-15s%-15s%-10s",
971 : "Connection", "App", "Wrk", "API Client", "ListenerID",
972 : "SegManager");
973 : else
974 5 : s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s", "Connection",
975 : "App", "Wrk");
976 :
977 5 : return s;
978 : }
979 :
980 0 : app_name = application_name_from_index (app_wrk->app_index);
981 0 : listener = listen_session_get_from_handle (handle);
982 0 : str = format (0, "%U", format_session, listener, verbose);
983 :
984 0 : if (verbose)
985 : {
986 : u8 *buf;
987 0 : buf = format (0, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
988 0 : s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%-10v%-15u%-15u%-10u", str,
989 : app_name, buf, app_wrk->api_client_index, handle, sm_index);
990 0 : vec_free (buf);
991 : }
992 : else
993 0 : s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%=10u", str, app_name,
994 : app_wrk->wrk_map_index);
995 :
996 0 : vec_free (str);
997 :
998 0 : return s;
999 : }
1000 :
1001 : u8 *
1002 0 : format_app_worker (u8 * s, va_list * args)
1003 : {
1004 0 : app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
1005 0 : u32 indent = 1;
1006 :
1007 0 : s = format (s,
1008 : "%U wrk-index %u app-index %u map-index %u "
1009 : "api-client-index %d mq-cong %u\n",
1010 : format_white_space, indent, app_wrk->wrk_index,
1011 : app_wrk->app_index, app_wrk->wrk_map_index,
1012 0 : app_wrk->api_client_index, app_wrk->mq_congested);
1013 0 : return s;
1014 : }
1015 :
1016 : void
1017 0 : app_worker_format_connects (app_worker_t * app_wrk, int verbose)
1018 : {
1019 : segment_manager_t *sm;
1020 :
1021 : /* Header */
1022 0 : if (!app_wrk)
1023 : {
1024 0 : segment_manager_format_sessions (0, verbose);
1025 0 : return;
1026 : }
1027 :
1028 0 : if (app_wrk->connects_seg_manager == (u32) ~ 0)
1029 0 : return;
1030 :
1031 0 : sm = segment_manager_get (app_wrk->connects_seg_manager);
1032 0 : segment_manager_format_sessions (sm, verbose);
1033 : }
1034 :
1035 : /*
1036 : * fd.io coding-style-patch-verification: ON
1037 : *
1038 : * Local Variables:
1039 : * eval: (c-set-style "gnu")
1040 : * End:
1041 : */
|