Line data Source code
1 : /*
2 : * Copyright (c) 2019 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 :
16 : #include <vnet/session/application.h>
17 : #include <vnet/session/application_interface.h>
18 : #include <vnet/session/session.h>
19 :
20 : /**
21 : * Pool of workers associated to apps
22 : */
23 : static app_worker_t *app_workers;
24 :
25 : app_worker_t *
26 216 : app_worker_alloc (application_t * app)
27 : {
28 : app_worker_t *app_wrk;
29 :
30 216 : pool_get (app_workers, app_wrk);
31 216 : clib_memset (app_wrk, 0, sizeof (*app_wrk));
32 216 : app_wrk->wrk_index = app_wrk - app_workers;
33 216 : app_wrk->app_index = app->app_index;
34 216 : app_wrk->wrk_map_index = ~0;
35 216 : app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
36 216 : clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
37 216 : vec_validate (app_wrk->wrk_evts, vlib_num_workers ());
38 216 : vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ());
39 : APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
40 216 : return app_wrk;
41 : }
42 :
43 : app_worker_t *
44 98538 : app_worker_get (u32 wrk_index)
45 : {
46 98538 : return pool_elt_at_index (app_workers, wrk_index);
47 : }
48 :
49 : app_worker_t *
50 487728 : app_worker_get_if_valid (u32 wrk_index)
51 : {
52 487728 : if (pool_is_free_index (app_workers, wrk_index))
53 178 : return 0;
54 487550 : return pool_elt_at_index (app_workers, wrk_index);
55 : }
56 :
57 : void
58 87 : app_worker_free (app_worker_t * app_wrk)
59 : {
60 87 : application_t *app = application_get (app_wrk->app_index);
61 87 : vnet_unlisten_args_t _a, *a = &_a;
62 87 : u64 handle, *handles = 0, *sm_indices = 0;
63 : segment_manager_t *sm;
64 : session_handle_t *sh;
65 : session_t *ls;
66 : u32 sm_index;
67 : int i;
68 :
69 : /*
70 : * Cleanup vpp wrk events
71 : */
72 87 : app_worker_del_all_events (app_wrk);
73 178 : for (i = 0; i < vec_len (app_wrk->wrk_evts); i++)
74 91 : clib_fifo_free (app_wrk->wrk_evts[i]);
75 :
76 87 : vec_free (app_wrk->wrk_evts);
77 87 : vec_free (app_wrk->wrk_mq_congested);
78 :
79 : /*
80 : * Listener cleanup
81 : */
82 :
83 5681 : hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
84 : ls = listen_session_get_from_handle (handle);
85 : vec_add1 (handles, app_listen_session_handle (ls));
86 : vec_add1 (sm_indices, sm_index);
87 : sm = segment_manager_get (sm_index);
88 : }));
89 :
90 113 : for (i = 0; i < vec_len (handles); i++)
91 : {
92 : /* Cleanup listener */
93 26 : a->app_index = app->app_index;
94 26 : a->wrk_map_index = app_wrk->wrk_map_index;
95 26 : a->handle = handles[i];
96 26 : (void) vnet_unlisten (a);
97 :
98 26 : sm = segment_manager_get_if_valid (sm_indices[i]);
99 26 : if (sm && !segment_manager_app_detached (sm))
100 : {
101 0 : sm->first_is_protected = 0;
102 0 : segment_manager_init_free (sm);
103 : }
104 : }
105 87 : vec_reset_length (handles);
106 87 : vec_free (sm_indices);
107 87 : hash_free (app_wrk->listeners_table);
108 :
109 : /*
110 : * Connects segment manager cleanup
111 : */
112 :
113 87 : if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
114 : {
115 87 : sm = segment_manager_get (app_wrk->connects_seg_manager);
116 87 : sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
117 87 : sm->first_is_protected = 0;
118 87 : segment_manager_init_free (sm);
119 : }
120 :
121 : /*
122 : * Half-open cleanup
123 : */
124 :
125 88 : pool_foreach (sh, app_wrk->half_open_table)
126 1 : session_cleanup_half_open (*sh);
127 :
128 87 : pool_free (app_wrk->half_open_table);
129 :
130 : /*
131 : * Detached listener segment managers cleanup
132 : */
133 110 : for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
134 : {
135 23 : sm = segment_manager_get (app_wrk->detached_seg_managers[i]);
136 23 : segment_manager_init_free (sm);
137 : }
138 87 : vec_free (app_wrk->detached_seg_managers);
139 87 : clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
140 :
141 : if (CLIB_DEBUG)
142 87 : clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
143 87 : pool_put (app_workers, app_wrk);
144 87 : }
145 :
146 : application_t *
147 0 : app_worker_get_app (u32 wrk_index)
148 : {
149 : app_worker_t *app_wrk;
150 0 : app_wrk = app_worker_get_if_valid (wrk_index);
151 0 : if (!app_wrk)
152 0 : return 0;
153 0 : return application_get_if_valid (app_wrk->app_index);
154 : }
155 :
156 : static segment_manager_t *
157 78 : app_worker_alloc_segment_manager (app_worker_t * app_wrk)
158 : {
159 : segment_manager_t *sm;
160 :
161 78 : sm = segment_manager_alloc ();
162 78 : sm->app_wrk_index = app_wrk->wrk_index;
163 78 : segment_manager_init (sm);
164 78 : return sm;
165 : }
166 :
167 : static int
168 404 : app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
169 : {
170 404 : svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
171 : int rv;
172 :
173 404 : if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
174 : &rx_fifo, &tx_fifo)))
175 0 : return rv;
176 :
177 404 : rx_fifo->shr->master_session_index = s->session_index;
178 404 : rx_fifo->master_thread_index = s->thread_index;
179 :
180 404 : tx_fifo->shr->master_session_index = s->session_index;
181 404 : tx_fifo->master_thread_index = s->thread_index;
182 :
183 404 : s->rx_fifo = rx_fifo;
184 404 : s->tx_fifo = tx_fifo;
185 404 : return 0;
186 : }
187 :
188 : int
189 78 : app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
190 : {
191 : segment_manager_t *sm;
192 :
193 : /* Allocate segment manager. All sessions derived out of a listen session
194 : * have fifos allocated by the same segment manager. */
195 78 : if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
196 0 : return SESSION_E_ALLOC;
197 :
198 : /* Once the first segment is mapped, don't remove it until unlisten */
199 78 : sm->first_is_protected = 1;
200 :
201 : /* Keep track of the segment manager for the listener or this worker */
202 78 : hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
203 : segment_manager_index (sm));
204 :
205 78 : if (transport_connection_is_cless (session_get_transport (ls)))
206 : {
207 3 : if (ls->rx_fifo)
208 0 : return SESSION_E_NOSUPPORT;
209 3 : return app_worker_alloc_session_fifos (sm, ls);
210 : }
211 75 : return 0;
212 : }
213 :
214 : session_error_t
215 75 : app_worker_start_listen (app_worker_t *app_wrk, app_listener_t *app_listener)
216 : {
217 : session_t *ls;
218 : int rv;
219 :
220 75 : if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
221 2 : return SESSION_E_ALREADY_LISTENING;
222 :
223 146 : app_listener->workers = clib_bitmap_set (app_listener->workers,
224 73 : app_wrk->wrk_map_index, 1);
225 :
226 73 : if (app_listener->session_index != SESSION_INVALID_INDEX)
227 : {
228 60 : ls = session_get (app_listener->session_index, 0);
229 60 : if ((rv = app_worker_init_listener (app_wrk, ls)))
230 0 : return rv;
231 : }
232 :
233 73 : if (app_listener->local_index != SESSION_INVALID_INDEX)
234 : {
235 18 : ls = session_get (app_listener->local_index, 0);
236 18 : if ((rv = app_worker_init_listener (app_wrk, ls)))
237 0 : return rv;
238 : }
239 :
240 73 : return 0;
241 : }
242 :
243 : static void
244 27 : app_worker_add_detached_sm (app_worker_t * app_wrk, u32 sm_index)
245 : {
246 27 : vec_add1 (app_wrk->detached_seg_managers, sm_index);
247 27 : }
248 :
249 : void
250 4 : app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index)
251 : {
252 : u32 i;
253 :
254 4 : clib_spinlock_lock (&app_wrk->detached_seg_managers_lock);
255 4 : for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
256 : {
257 4 : if (app_wrk->detached_seg_managers[i] == sm_index)
258 : {
259 4 : vec_del1 (app_wrk->detached_seg_managers, i);
260 4 : break;
261 : }
262 : }
263 4 : clib_spinlock_unlock (&app_wrk->detached_seg_managers_lock);
264 4 : }
265 :
266 : static void
267 62 : app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
268 : {
269 : session_handle_t handle;
270 : segment_manager_t *sm;
271 : uword *sm_indexp;
272 62 : session_state_t *states = 0;
273 :
274 62 : handle = listen_session_get_handle (ls);
275 62 : sm_indexp = hash_get (app_wrk->listeners_table, handle);
276 62 : if (PREDICT_FALSE (!sm_indexp))
277 0 : return;
278 :
279 : /* Dealloc fifos, if any (dgram listeners) */
280 62 : if (ls->rx_fifo)
281 : {
282 3 : segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo);
283 3 : ls->tx_fifo = ls->rx_fifo = 0;
284 : }
285 :
286 : /* Try to cleanup segment manager */
287 62 : sm = segment_manager_get (*sm_indexp);
288 62 : if (sm)
289 : {
290 62 : sm->first_is_protected = 0;
291 62 : segment_manager_app_detach (sm);
292 62 : if (!segment_manager_has_fifos (sm))
293 : {
294 : /* Empty segment manager, cleanup it up */
295 35 : segment_manager_free (sm);
296 : }
297 : else
298 : {
299 : /* Delete sessions in CREATED state */
300 27 : vec_add1 (states, SESSION_STATE_CREATED);
301 27 : segment_manager_del_sessions_filter (sm, states);
302 27 : vec_free (states);
303 :
304 : /* Track segment manager in case app detaches and all the
305 : * outstanding sessions need to be closed */
306 27 : app_worker_add_detached_sm (app_wrk, *sm_indexp);
307 27 : sm->flags |= SEG_MANAGER_F_DETACHED_LISTENER;
308 : }
309 : }
310 :
311 62 : hash_unset (app_wrk->listeners_table, handle);
312 : }
313 :
314 : int
315 57 : app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
316 : {
317 : session_t *ls;
318 :
319 57 : if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
320 0 : return 0;
321 :
322 57 : if (al->session_index != SESSION_INVALID_INDEX)
323 : {
324 44 : ls = listen_session_get (al->session_index);
325 44 : app_worker_stop_listen_session (app_wrk, ls);
326 : }
327 :
328 57 : if (al->local_index != SESSION_INVALID_INDEX)
329 : {
330 18 : ls = listen_session_get (al->local_index);
331 18 : app_worker_stop_listen_session (app_wrk, ls);
332 : }
333 :
334 57 : clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
335 57 : if (clib_bitmap_is_zero (al->workers))
336 57 : app_listener_cleanup (al);
337 :
338 57 : return 0;
339 : }
340 :
341 : int
342 176 : app_worker_init_accepted (session_t * s)
343 : {
344 : app_worker_t *app_wrk;
345 : segment_manager_t *sm;
346 : session_t *listener;
347 : application_t *app;
348 :
349 176 : listener = listen_session_get_from_handle (s->listener_handle);
350 176 : app_wrk = application_listener_select_worker (listener);
351 176 : if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
352 0 : return -1;
353 :
354 176 : s->app_wrk_index = app_wrk->wrk_index;
355 176 : app = application_get (app_wrk->app_index);
356 176 : if (app->cb_fns.fifo_tuning_callback)
357 0 : s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
358 :
359 176 : sm = app_worker_get_listen_segment_manager (app_wrk, listener);
360 176 : if (app_worker_alloc_session_fifos (sm, s))
361 0 : return -1;
362 :
363 176 : return 0;
364 : }
365 :
366 : int
367 43 : app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
368 : u32 opaque, session_error_t err)
369 : {
370 43 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_BOUND,
371 : .as_u64[0] = alsh,
372 43 : .as_u64[1] = (u64) opaque << 32 | err };
373 :
374 43 : app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
375 :
376 43 : return 0;
377 : }
378 :
379 : int
380 23 : app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
381 : u32 opaque, session_error_t err)
382 : {
383 23 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY,
384 : .as_u64[0] = sh,
385 23 : .as_u64[1] = (u64) opaque << 32 | (u32) err };
386 :
387 23 : app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
388 23 : return 0;
389 : }
390 :
391 : int
392 204 : app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
393 : {
394 204 : app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED);
395 204 : return 0;
396 : }
397 :
398 : int
399 225 : app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
400 : {
401 225 : application_t *app = application_get (app_wrk->app_index);
402 : segment_manager_t *sm;
403 :
404 225 : if (app->cb_fns.fifo_tuning_callback)
405 0 : s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
406 :
407 : /* Allocate fifos for session, unless the app is a builtin proxy */
408 225 : if (application_is_builtin_proxy (app))
409 0 : return app->cb_fns.proxy_alloc_session_fifos (s);
410 :
411 225 : sm = app_worker_get_connect_segment_manager (app_wrk);
412 225 : return app_worker_alloc_session_fifos (sm, s);
413 : }
414 :
415 : int
416 207 : app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
417 : session_error_t err, u32 opaque)
418 : {
419 414 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_CONNECTED,
420 207 : .as_u64[0] = s ? s->session_index : ~0,
421 207 : .as_u64[1] = (u64) opaque << 32 | (u32) err };
422 207 : u32 thread_index = s ? s->thread_index : vlib_get_thread_index ();
423 :
424 207 : app_worker_add_event_custom (app_wrk, thread_index, &evt);
425 207 : return 0;
426 : }
427 :
428 : int
429 148 : app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
430 : {
431 : session_handle_t *shp;
432 :
433 148 : ASSERT (session_vlib_thread_is_cl_thread ());
434 148 : pool_get (app_wrk->half_open_table, shp);
435 148 : *shp = sh;
436 :
437 148 : return (shp - app_wrk->half_open_table);
438 : }
439 :
440 : int
441 147 : app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
442 : {
443 147 : app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP);
444 147 : return 0;
445 : }
446 :
447 : int
448 169 : app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
449 : {
450 169 : app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED);
451 169 : return 0;
452 : }
453 :
454 : int
455 264 : app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
456 : {
457 264 : app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED);
458 264 : return 0;
459 : }
460 :
461 : int
462 6 : app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
463 : {
464 6 : app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET);
465 6 : return 0;
466 : }
467 :
468 : int
469 412 : app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
470 : session_cleanup_ntf_t ntf)
471 : {
472 824 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
473 412 : .as_u64[0] = (u64) ntf << 32 | s->session_index,
474 412 : .as_u64[1] = pointer_to_uword (session_cleanup) };
475 :
476 412 : app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
477 :
478 412 : return 0;
479 : }
480 :
481 : int
482 12 : app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
483 : session_cleanup_ntf_t ntf,
484 : void (*cleanup_cb) (session_t *s))
485 : {
486 24 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
487 12 : .as_u64[0] = (u64) ntf << 32 | s->session_index,
488 12 : .as_u64[1] = pointer_to_uword (cleanup_cb) };
489 :
490 12 : app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
491 :
492 12 : return 0;
493 : }
494 :
495 : int
496 48670 : app_worker_rx_notify (app_worker_t *app_wrk, session_t *s)
497 : {
498 48670 : app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX);
499 48670 : return 0;
500 : }
501 :
502 : int
503 0 : app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
504 : session_handle_t new_sh)
505 : {
506 0 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_MIGRATED,
507 0 : .as_u64[0] = s->session_index,
508 : .as_u64[1] = new_sh };
509 :
510 0 : app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
511 0 : return 0;
512 : }
513 :
514 : int
515 0 : app_worker_own_session (app_worker_t * app_wrk, session_t * s)
516 : {
517 : segment_manager_t *sm;
518 : svm_fifo_t *rxf, *txf;
519 : int rv;
520 :
521 0 : if (s->session_state == SESSION_STATE_LISTENING)
522 0 : return application_change_listener_owner (s, app_wrk);
523 :
524 0 : s->app_wrk_index = app_wrk->wrk_index;
525 :
526 0 : rxf = s->rx_fifo;
527 0 : txf = s->tx_fifo;
528 :
529 0 : if (!rxf || !txf)
530 0 : return 0;
531 :
532 0 : s->rx_fifo = 0;
533 0 : s->tx_fifo = 0;
534 :
535 0 : sm = app_worker_get_connect_segment_manager (app_wrk);
536 0 : if ((rv = app_worker_alloc_session_fifos (sm, s)))
537 0 : return rv;
538 :
539 0 : if (!svm_fifo_is_empty_cons (rxf))
540 0 : svm_fifo_clone (s->rx_fifo, rxf);
541 :
542 0 : if (!svm_fifo_is_empty_cons (txf))
543 0 : svm_fifo_clone (s->tx_fifo, txf);
544 :
545 0 : segment_manager_dealloc_fifos (rxf, txf);
546 :
547 0 : return 0;
548 : }
549 :
550 : int
551 213 : app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
552 : session_handle_t *rsh)
553 : {
554 213 : if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
555 0 : return SESSION_E_REFUSED;
556 :
557 213 : sep->app_wrk_index = app_wrk->wrk_index;
558 :
559 213 : return session_open (sep, rsh);
560 : }
561 :
562 : int
563 0 : app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
564 : svm_fifo_t * f,
565 : session_ft_action_t act, u32 len)
566 : {
567 0 : application_t *app = application_get (app_wrk->app_index);
568 0 : return app->cb_fns.fifo_tuning_callback (s, f, act, len);
569 : }
570 :
571 : segment_manager_t *
572 230 : app_worker_get_connect_segment_manager (app_worker_t * app)
573 : {
574 230 : ASSERT (app->connects_seg_manager != (u32) ~ 0);
575 230 : return segment_manager_get (app->connects_seg_manager);
576 : }
577 :
578 : segment_manager_t *
579 189 : app_worker_get_listen_segment_manager (app_worker_t * app,
580 : session_t * listener)
581 : {
582 : uword *smp;
583 189 : smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
584 189 : ALWAYS_ASSERT (smp != 0);
585 189 : return segment_manager_get (*smp);
586 : }
587 :
588 : session_t *
589 8 : app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
590 : u8 transport_proto)
591 : {
592 : session_t *listener;
593 : u64 handle;
594 : u32 sm_index;
595 : u8 sst;
596 :
597 8 : sst = session_type_from_proto_and_ip (transport_proto,
598 : fib_proto == FIB_PROTOCOL_IP4);
599 :
600 : /* *INDENT-OFF* */
601 353 : hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
602 : listener = listen_session_get_from_handle (handle);
603 : if (listener->session_type == sst
604 : && !(listener->flags & SESSION_F_PROXY))
605 : return listener;
606 : }));
607 : /* *INDENT-ON* */
608 :
609 2 : return 0;
610 : }
611 :
612 : session_t *
613 2 : app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
614 : u8 transport_proto)
615 : {
616 : session_t *listener;
617 : u64 handle;
618 : u32 sm_index;
619 : u8 sst;
620 :
621 2 : sst = session_type_from_proto_and_ip (transport_proto,
622 : fib_proto == FIB_PROTOCOL_IP4);
623 :
624 : /* *INDENT-OFF* */
625 41 : hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
626 : listener = listen_session_get_from_handle (handle);
627 : if (listener->session_type == sst && (listener->flags & SESSION_F_PROXY))
628 : return listener;
629 : }));
630 : /* *INDENT-ON* */
631 :
632 0 : return 0;
633 : }
634 :
635 : /**
636 : * Send an API message to the external app, to map new segment
637 : */
638 : int
639 88 : app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
640 : {
641 88 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT,
642 : .as_u64[1] = segment_handle };
643 :
644 88 : app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
645 :
646 88 : return 0;
647 : }
648 :
649 : int
650 6 : app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
651 : {
652 6 : session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT,
653 : .as_u64[1] = segment_handle };
654 :
655 6 : app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
656 :
657 6 : return 0;
658 : }
659 :
660 : static int
661 40 : app_wrk_send_fd (app_worker_t *app_wrk, int fd)
662 : {
663 40 : if (!appns_sapi_enabled ())
664 : {
665 : vl_api_registration_t *reg;
666 : clib_error_t *error;
667 :
668 : reg =
669 31 : vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
670 31 : if (!reg)
671 : {
672 0 : clib_warning ("no api registration for client: %u",
673 : app_wrk->api_client_index);
674 0 : return -1;
675 : }
676 :
677 31 : if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
678 0 : return -1;
679 :
680 31 : error = vl_api_send_fd_msg (reg, &fd, 1);
681 31 : if (error)
682 : {
683 0 : clib_error_report (error);
684 0 : return -1;
685 : }
686 :
687 31 : return 0;
688 : }
689 :
690 9 : app_sapi_msg_t smsg = { 0 };
691 : app_namespace_t *app_ns;
692 : clib_error_t *error;
693 : application_t *app;
694 : clib_socket_t *cs;
695 : u32 cs_index;
696 :
697 9 : app = application_get (app_wrk->app_index);
698 9 : app_ns = app_namespace_get (app->ns_index);
699 9 : cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
700 9 : cs = appns_sapi_get_socket (app_ns, cs_index);
701 9 : if (PREDICT_FALSE (!cs))
702 0 : return -1;
703 :
704 : /* There's no payload for the message only the type */
705 9 : smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
706 9 : error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
707 9 : if (error)
708 : {
709 0 : clib_error_report (error);
710 0 : return -1;
711 : }
712 :
713 9 : return 0;
714 : }
715 :
716 : void
717 273365 : app_worker_add_event (app_worker_t *app_wrk, session_t *s,
718 : session_evt_type_t evt_type)
719 : {
720 : session_event_t *evt;
721 :
722 273365 : ASSERT (s->thread_index == vlib_get_thread_index ());
723 273365 : clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt);
724 273365 : evt->session_index = s->session_index;
725 273365 : evt->event_type = evt_type;
726 273365 : evt->postponed = 0;
727 :
728 : /* First event for this app_wrk. Schedule it for handling in session input */
729 273365 : if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1)
730 : {
731 215488 : session_worker_t *wrk = session_main_get_worker (s->thread_index);
732 215488 : session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
733 : }
734 273365 : }
735 :
736 : void
737 791 : app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
738 : session_event_t *evt)
739 : {
740 791 : clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt);
741 :
742 : /* First event for this app_wrk. Schedule it for handling in session input */
743 791 : if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1)
744 : {
745 264 : session_worker_t *wrk = session_main_get_worker (thread_index);
746 264 : session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
747 : }
748 791 : }
749 :
750 : always_inline void
751 319 : app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
752 : u32 msg_len, int fd)
753 : {
754 319 : svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
755 319 : svm_msg_q_t *mq = app_wrk->event_queue;
756 : session_event_t *evt;
757 :
758 319 : ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING));
759 319 : *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING);
760 :
761 319 : evt = svm_msg_q_msg_data (mq, mq_msg);
762 319 : clib_memset (evt, 0, sizeof (*evt));
763 319 : evt->event_type = evt_type;
764 319 : clib_memcpy_fast (evt->data, msg, msg_len);
765 :
766 319 : if (fd != -1)
767 40 : app_wrk_send_fd (app_wrk, fd);
768 :
769 319 : svm_msg_q_add_raw (mq, mq_msg);
770 319 : }
771 :
772 : void
773 40 : app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
774 : u32 msg_len, int fd)
775 : {
776 40 : app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
777 40 : }
778 :
779 : void
780 279 : app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
781 : u32 msg_len)
782 : {
783 279 : app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
784 279 : }
785 :
786 : u8
787 0 : app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index)
788 : {
789 0 : return app_wrk->wrk_mq_congested[thread_index] > 0;
790 : }
791 :
792 : void
793 0 : app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index)
794 : {
795 0 : clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1);
796 0 : ASSERT (thread_index == vlib_get_thread_index ());
797 0 : app_wrk->wrk_mq_congested[thread_index] = 1;
798 0 : }
799 :
800 : void
801 0 : app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index)
802 : {
803 0 : clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1);
804 0 : ASSERT (thread_index == vlib_get_thread_index ());
805 0 : app_wrk->wrk_mq_congested[thread_index] = 0;
806 0 : }
807 :
808 : u8 *
809 5 : format_app_worker_listener (u8 * s, va_list * args)
810 : {
811 5 : app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
812 5 : u64 handle = va_arg (*args, u64);
813 5 : u32 sm_index = va_arg (*args, u32);
814 5 : int verbose = va_arg (*args, int);
815 : session_t *listener;
816 : const u8 *app_name;
817 : u8 *str;
818 :
819 5 : if (!app_wrk)
820 : {
821 5 : if (verbose)
822 0 : s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s%-15s%-15s%-10s",
823 : "Connection", "App", "Wrk", "API Client", "ListenerID",
824 : "SegManager");
825 : else
826 5 : s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s", "Connection",
827 : "App", "Wrk");
828 :
829 5 : return s;
830 : }
831 :
832 0 : app_name = application_name_from_index (app_wrk->app_index);
833 0 : listener = listen_session_get_from_handle (handle);
834 0 : str = format (0, "%U", format_session, listener, verbose);
835 :
836 0 : if (verbose)
837 : {
838 : u8 *buf;
839 0 : buf = format (0, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
840 0 : s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%-10v%-15u%-15u%-10u", str,
841 : app_name, buf, app_wrk->api_client_index, handle, sm_index);
842 0 : vec_free (buf);
843 : }
844 : else
845 0 : s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%=10u", str, app_name,
846 : app_wrk->wrk_map_index);
847 :
848 0 : vec_free (str);
849 :
850 0 : return s;
851 : }
852 :
853 : u8 *
854 0 : format_app_worker (u8 * s, va_list * args)
855 : {
856 0 : app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
857 0 : u32 indent = 1;
858 :
859 0 : s = format (s,
860 : "%U wrk-index %u app-index %u map-index %u "
861 : "api-client-index %d mq-cong %u\n",
862 : format_white_space, indent, app_wrk->wrk_index,
863 : app_wrk->app_index, app_wrk->wrk_map_index,
864 0 : app_wrk->api_client_index, app_wrk->mq_congested);
865 0 : return s;
866 : }
867 :
868 : void
869 0 : app_worker_format_connects (app_worker_t * app_wrk, int verbose)
870 : {
871 : segment_manager_t *sm;
872 :
873 : /* Header */
874 0 : if (!app_wrk)
875 : {
876 0 : segment_manager_format_sessions (0, verbose);
877 0 : return;
878 : }
879 :
880 0 : if (app_wrk->connects_seg_manager == (u32) ~ 0)
881 0 : return;
882 :
883 0 : sm = segment_manager_get (app_wrk->connects_seg_manager);
884 0 : segment_manager_format_sessions (sm, verbose);
885 : }
886 :
887 : /*
888 : * fd.io coding-style-patch-verification: ON
889 : *
890 : * Local Variables:
891 : * eval: (c-set-style "gnu")
892 : * End:
893 : */
|