Line data Source code
1 : /*
2 : * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 : /**
16 : * @file
17 : * @brief Session and session manager
18 : */
19 :
20 : #include <vnet/plugin/plugin.h>
21 : #include <vnet/session/session.h>
22 : #include <vnet/session/application.h>
23 : #include <vnet/dpo/load_balance.h>
24 : #include <vnet/fib/ip4_fib.h>
25 : #include <vlib/stats/stats.h>
26 : #include <vlib/dma/dma.h>
27 :
28 : session_main_t session_main;
29 :
30 : static inline int
31 7895 : session_send_evt_to_thread (void *data, void *args, u32 thread_index,
32 : session_evt_type_t evt_type)
33 : {
34 7895 : session_worker_t *wrk = session_main_get_worker (thread_index);
35 : session_event_t *evt;
36 : svm_msg_q_msg_t msg;
37 : svm_msg_q_t *mq;
38 :
39 7895 : mq = wrk->vpp_event_queue;
40 7895 : if (PREDICT_FALSE (svm_msg_q_lock (mq)))
41 0 : return -1;
42 7895 : if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
43 : {
44 0 : svm_msg_q_unlock (mq);
45 0 : return -2;
46 : }
47 7895 : switch (evt_type)
48 : {
49 146 : case SESSION_CTRL_EVT_RPC:
50 146 : msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
51 146 : evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
52 146 : evt->rpc_args.fp = data;
53 146 : evt->rpc_args.arg = args;
54 146 : break;
55 7749 : case SESSION_IO_EVT_RX:
56 : case SESSION_IO_EVT_TX:
57 : case SESSION_IO_EVT_TX_FLUSH:
58 : case SESSION_IO_EVT_BUILTIN_RX:
59 7749 : msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
60 7749 : evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
61 7749 : evt->session_index = *(u32 *) data;
62 7749 : break;
63 0 : case SESSION_IO_EVT_TX_MAIN:
64 : case SESSION_CTRL_EVT_CLOSE:
65 : case SESSION_CTRL_EVT_RESET:
66 0 : msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
67 0 : evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
68 0 : evt->session_handle = session_handle ((session_t *) data);
69 0 : break;
70 0 : default:
71 0 : clib_warning ("evt unhandled!");
72 0 : svm_msg_q_unlock (mq);
73 0 : return -1;
74 : }
75 7895 : evt->event_type = evt_type;
76 :
77 7895 : svm_msg_q_add_and_unlock (mq, &msg);
78 :
79 7895 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
80 11 : vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
81 :
82 7895 : return 0;
83 : }
84 :
85 : int
86 7744 : session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 : {
88 15488 : return session_send_evt_to_thread (&f->shr->master_session_index, 0,
89 7744 : f->master_thread_index, evt_type);
90 : }
91 :
92 : int
93 5 : session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94 : session_evt_type_t evt_type)
95 : {
96 5 : return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 : }
98 :
99 : int
100 0 : session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
101 : {
102 : /* only events supported are disconnect, shutdown and reset */
103 0 : ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
104 : evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
105 : evt_type == SESSION_CTRL_EVT_RESET);
106 0 : return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
107 : }
108 :
109 : void
110 146 : session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
111 : void *rpc_args)
112 : {
113 146 : session_send_evt_to_thread (fp, rpc_args, thread_index,
114 : SESSION_CTRL_EVT_RPC);
115 146 : }
116 :
117 : void
118 83 : session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
119 : {
120 83 : if (thread_index != vlib_get_thread_index ())
121 8 : session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
122 : else
123 : {
124 75 : void (*fnp) (void *) = fp;
125 75 : fnp (rpc_args);
126 : }
127 83 : }
128 :
129 : void
130 41829 : session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
131 : {
132 41829 : session_t *s = session_get (tc->s_index, tc->thread_index);
133 :
134 41829 : ASSERT (s->thread_index == vlib_get_thread_index ());
135 41829 : ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
136 :
137 41829 : if (!(s->flags & SESSION_F_CUSTOM_TX))
138 : {
139 41829 : s->flags |= SESSION_F_CUSTOM_TX;
140 41829 : if (svm_fifo_set_event (s->tx_fifo)
141 16380 : || transport_connection_is_descheduled (tc))
142 : {
143 : session_evt_elt_t *elt;
144 : session_worker_t *wrk;
145 :
146 32682 : wrk = session_main_get_worker (tc->thread_index);
147 32682 : if (has_prio)
148 32682 : elt = session_evt_alloc_new (wrk);
149 : else
150 0 : elt = session_evt_alloc_old (wrk);
151 32682 : elt->evt.session_index = tc->s_index;
152 32682 : elt->evt.event_type = SESSION_IO_EVT_TX;
153 32682 : tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
154 :
155 32682 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
156 24 : vlib_node_set_interrupt_pending (wrk->vm,
157 : session_queue_node.index);
158 : }
159 : }
160 41829 : }
161 :
162 : void
163 9052 : sesssion_reschedule_tx (transport_connection_t * tc)
164 : {
165 9052 : session_worker_t *wrk = session_main_get_worker (tc->thread_index);
166 : session_evt_elt_t *elt;
167 :
168 9052 : ASSERT (tc->thread_index == vlib_get_thread_index ());
169 :
170 9052 : elt = session_evt_alloc_new (wrk);
171 9052 : elt->evt.session_index = tc->s_index;
172 9052 : elt->evt.event_type = SESSION_IO_EVT_TX;
173 :
174 9052 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
175 18 : vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
176 9052 : }
177 :
178 : static void
179 507 : session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
180 : {
181 507 : u32 thread_index = vlib_get_thread_index ();
182 : session_evt_elt_t *elt;
183 : session_worker_t *wrk;
184 :
185 : /* If we are in the handler thread, or being called with the worker barrier
186 : * held, just append a new event to pending disconnects vector. */
187 507 : if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
188 : {
189 507 : wrk = session_main_get_worker (s->thread_index);
190 507 : elt = session_evt_alloc_ctrl (wrk);
191 507 : clib_memset (&elt->evt, 0, sizeof (session_event_t));
192 507 : elt->evt.session_handle = session_handle (s);
193 507 : elt->evt.event_type = evt;
194 :
195 507 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
196 5 : vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
197 : }
198 : else
199 0 : session_send_ctrl_evt_to_thread (s, evt);
200 507 : }
201 :
202 : session_t *
203 640 : session_alloc (u32 thread_index)
204 : {
205 640 : session_worker_t *wrk = &session_main.wrk[thread_index];
206 : session_t *s;
207 :
208 640 : pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
209 640 : clib_memset (s, 0, sizeof (*s));
210 640 : s->session_index = s - wrk->sessions;
211 640 : s->thread_index = thread_index;
212 640 : s->app_index = APP_INVALID_INDEX;
213 :
214 640 : return s;
215 : }
216 :
217 : void
218 487 : session_free (session_t * s)
219 : {
220 487 : session_worker_t *wrk = &session_main.wrk[s->thread_index];
221 :
222 : SESSION_EVT (SESSION_EVT_FREE, s);
223 : if (CLIB_DEBUG)
224 487 : clib_memset (s, 0xFA, sizeof (*s));
225 487 : pool_put (wrk->sessions, s);
226 487 : }
227 :
228 : u8
229 6668650 : session_is_valid (u32 si, u8 thread_index)
230 : {
231 : session_t *s;
232 : transport_connection_t *tc;
233 :
234 6668650 : s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
235 :
236 6668650 : if (s->thread_index != thread_index || s->session_index != si)
237 0 : return 0;
238 :
239 6668650 : if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
240 6667880 : || s->session_state <= SESSION_STATE_LISTENING)
241 2274 : return 1;
242 :
243 6666380 : if ((s->session_state == SESSION_STATE_CONNECTING ||
244 6666010 : s->session_state == SESSION_STATE_TRANSPORT_CLOSED) &&
245 609 : (s->flags & SESSION_F_HALF_OPEN))
246 305 : return 1;
247 :
248 6666070 : tc = session_get_transport (s);
249 6666070 : if (s->connection_index != tc->c_index ||
250 6666070 : s->thread_index != tc->thread_index || tc->s_index != si)
251 0 : return 0;
252 :
253 6666070 : return 1;
254 : }
255 :
256 : void
257 265 : session_cleanup (session_t *s)
258 : {
259 265 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
260 265 : session_free (s);
261 265 : }
262 :
263 : static void
264 518 : session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
265 : {
266 : app_worker_t *app_wrk;
267 :
268 518 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
269 518 : if (!app_wrk)
270 : {
271 126 : if (ntf == SESSION_CLEANUP_TRANSPORT)
272 61 : return;
273 :
274 65 : session_cleanup (s);
275 65 : return;
276 : }
277 392 : app_worker_cleanup_notify (app_wrk, s, ntf);
278 : }
279 :
280 : void
281 265 : session_program_cleanup (session_t *s)
282 : {
283 265 : ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED);
284 265 : session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
285 265 : }
286 :
287 : /**
288 : * Cleans up session and lookup table.
289 : *
290 : * Transport connection must still be valid.
291 : */
292 : static void
293 138 : session_delete (session_t * s)
294 : {
295 : int rv;
296 :
297 : /* Delete from the main lookup table. */
298 138 : if ((rv = session_lookup_del_session (s)))
299 0 : clib_warning ("session %u hash delete rv %d", s->session_index, rv);
300 :
301 138 : session_program_cleanup (s);
302 138 : }
303 :
304 : void
305 1 : session_cleanup_half_open (session_handle_t ho_handle)
306 : {
307 1 : session_t *ho = session_get_from_handle (ho_handle);
308 :
309 : /* App transports can migrate their half-opens */
310 1 : if (ho->flags & SESSION_F_IS_MIGRATING)
311 : {
312 : /* Session still migrating, move to closed state to signal that the
313 : * session should be removed. */
314 1 : if (ho->connection_index == ~0)
315 : {
316 0 : session_set_state (ho, SESSION_STATE_CLOSED);
317 0 : return;
318 : }
319 : /* Migrated transports are no longer half-opens */
320 1 : transport_cleanup (session_get_transport_proto (ho),
321 1 : ho->connection_index, ho->app_index /* overloaded */);
322 : }
323 0 : else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED)
324 : {
325 : /* Cleanup half-open session lookup table if need be */
326 0 : if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
327 : {
328 : transport_connection_t *tc;
329 0 : tc = transport_get_half_open (session_get_transport_proto (ho),
330 : ho->connection_index);
331 0 : if (tc && !(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
332 0 : session_lookup_del_half_open (tc);
333 : }
334 0 : transport_cleanup_half_open (session_get_transport_proto (ho),
335 : ho->connection_index);
336 : }
337 1 : session_free (ho);
338 : }
339 :
340 : static void
341 147 : session_half_open_free (session_t *ho)
342 : {
343 : app_worker_t *app_wrk;
344 :
345 147 : ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
346 147 : app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
347 147 : if (app_wrk)
348 147 : app_worker_del_half_open (app_wrk, ho);
349 : else
350 0 : session_free (ho);
351 147 : }
352 :
353 : static void
354 0 : session_half_open_free_rpc (void *args)
355 : {
356 0 : session_t *ho = ho_session_get (pointer_to_uword (args));
357 0 : session_half_open_free (ho);
358 0 : }
359 :
360 : void
361 147 : session_half_open_delete_notify (transport_connection_t *tc)
362 : {
363 147 : session_t *ho = ho_session_get (tc->s_index);
364 :
365 : /* Cleanup half-open lookup table if need be */
366 147 : if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
367 : {
368 15 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
369 0 : session_lookup_del_half_open (tc);
370 : }
371 147 : session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED);
372 :
373 : /* Notification from ctrl thread accepted without rpc */
374 147 : if (tc->thread_index == transport_cl_thread ())
375 : {
376 147 : session_half_open_free (ho);
377 : }
378 : else
379 : {
380 0 : void *args = uword_to_pointer ((uword) tc->s_index, void *);
381 0 : session_send_rpc_evt_to_thread_force (transport_cl_thread (),
382 : session_half_open_free_rpc, args);
383 : }
384 147 : }
385 :
386 : void
387 13 : session_half_open_migrate_notify (transport_connection_t *tc)
388 : {
389 : session_t *ho;
390 :
391 : /* Support half-open migrations only for transports with no lookup */
392 13 : ASSERT (tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP);
393 :
394 13 : ho = ho_session_get (tc->s_index);
395 13 : ho->flags |= SESSION_F_IS_MIGRATING;
396 13 : ho->connection_index = ~0;
397 13 : }
398 :
399 : int
400 13 : session_half_open_migrated_notify (transport_connection_t *tc)
401 : {
402 : session_t *ho;
403 :
404 13 : ho = ho_session_get (tc->s_index);
405 :
406 : /* App probably detached so the half-open must be cleaned up */
407 13 : if (ho->session_state == SESSION_STATE_CLOSED)
408 : {
409 0 : session_half_open_delete_notify (tc);
410 0 : return -1;
411 : }
412 13 : ho->connection_index = tc->c_index;
413 : /* Overload app index for half-open with new thread */
414 13 : ho->app_index = tc->thread_index;
415 13 : return 0;
416 : }
417 :
418 : session_t *
419 311 : session_alloc_for_connection (transport_connection_t * tc)
420 : {
421 : session_t *s;
422 311 : u32 thread_index = tc->thread_index;
423 :
424 311 : ASSERT (thread_index == vlib_get_thread_index ()
425 : || transport_protocol_is_cl (tc->proto));
426 :
427 311 : s = session_alloc (thread_index);
428 311 : s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
429 311 : session_set_state (s, SESSION_STATE_CLOSED);
430 :
431 : /* Attach transport to session and vice versa */
432 311 : s->connection_index = tc->c_index;
433 311 : tc->s_index = s->session_index;
434 311 : return s;
435 : }
436 :
437 : session_t *
438 148 : session_alloc_for_half_open (transport_connection_t *tc)
439 : {
440 : session_t *s;
441 :
442 148 : s = ho_session_alloc ();
443 148 : s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
444 148 : s->connection_index = tc->c_index;
445 148 : tc->s_index = s->session_index;
446 148 : return s;
447 : }
448 :
449 : /**
450 : * Discards bytes from buffer chain
451 : *
452 : * It discards n_bytes_to_drop starting at first buffer after chain_b
453 : */
454 : always_inline void
455 0 : session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
456 : vlib_buffer_t ** chain_b,
457 : u32 n_bytes_to_drop)
458 : {
459 0 : vlib_buffer_t *next = *chain_b;
460 0 : u32 to_drop = n_bytes_to_drop;
461 0 : ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
462 0 : while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
463 : {
464 0 : next = vlib_get_buffer (vm, next->next_buffer);
465 0 : if (next->current_length > to_drop)
466 : {
467 0 : vlib_buffer_advance (next, to_drop);
468 0 : to_drop = 0;
469 : }
470 : else
471 : {
472 0 : to_drop -= next->current_length;
473 0 : next->current_length = 0;
474 : }
475 : }
476 0 : *chain_b = next;
477 :
478 0 : if (to_drop == 0)
479 0 : b->total_length_not_including_first_buffer -= n_bytes_to_drop;
480 0 : }
481 :
482 : /**
483 : * Enqueue buffer chain tail
484 : */
485 : always_inline int
486 0 : session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
487 : u32 offset, u8 is_in_order)
488 : {
489 : vlib_buffer_t *chain_b;
490 : u32 chain_bi, len, diff;
491 0 : vlib_main_t *vm = vlib_get_main ();
492 : u8 *data;
493 0 : u32 written = 0;
494 0 : int rv = 0;
495 :
496 0 : if (is_in_order && offset)
497 : {
498 0 : diff = offset - b->current_length;
499 0 : if (diff > b->total_length_not_including_first_buffer)
500 0 : return 0;
501 0 : chain_b = b;
502 0 : session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
503 0 : chain_bi = vlib_get_buffer_index (vm, chain_b);
504 : }
505 : else
506 0 : chain_bi = b->next_buffer;
507 :
508 : do
509 : {
510 0 : chain_b = vlib_get_buffer (vm, chain_bi);
511 0 : data = vlib_buffer_get_current (chain_b);
512 0 : len = chain_b->current_length;
513 0 : if (!len)
514 0 : continue;
515 0 : if (is_in_order)
516 : {
517 0 : rv = svm_fifo_enqueue (s->rx_fifo, len, data);
518 0 : if (rv == len)
519 : {
520 0 : written += rv;
521 : }
522 0 : else if (rv < len)
523 : {
524 0 : return (rv > 0) ? (written + rv) : written;
525 : }
526 0 : else if (rv > len)
527 : {
528 0 : written += rv;
529 :
530 : /* written more than what was left in chain */
531 0 : if (written > b->total_length_not_including_first_buffer)
532 0 : return written;
533 :
534 : /* drop the bytes that have already been delivered */
535 0 : session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
536 : }
537 : }
538 : else
539 : {
540 0 : rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
541 0 : if (rv)
542 : {
543 0 : clib_warning ("failed to enqueue multi-buffer seg");
544 0 : return -1;
545 : }
546 0 : offset += len;
547 : }
548 : }
549 0 : while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
550 0 : ? chain_b->next_buffer : 0));
551 :
552 0 : if (is_in_order)
553 0 : return written;
554 :
555 0 : return 0;
556 : }
557 :
558 : void
559 1186960 : session_fifo_tuning (session_t * s, svm_fifo_t * f,
560 : session_ft_action_t act, u32 len)
561 : {
562 1186960 : if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
563 : {
564 0 : app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
565 0 : app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
566 : if (CLIB_ASSERT_ENABLE)
567 : {
568 : segment_manager_t *sm;
569 0 : sm = segment_manager_get (f->segment_manager);
570 0 : ASSERT (f->shr->size >= 4096);
571 0 : ASSERT (f->shr->size <= sm->max_fifo_size);
572 : }
573 : }
574 1186960 : }
575 :
576 : void
577 215752 : session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index)
578 : {
579 : u8 need_interrupt;
580 :
581 215752 : ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ());
582 215752 : need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf);
583 215752 : wrk->app_wrks_pending_ntf =
584 215752 : clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1);
585 :
586 215752 : if (need_interrupt)
587 187668 : vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
588 215752 : }
589 :
590 : always_inline void
591 222868 : session_program_io_event (app_worker_t *app_wrk, session_t *s,
592 : session_evt_type_t et, u8 is_cl)
593 : {
594 222868 : if (is_cl)
595 : {
596 : /* Special events for connectionless sessions */
597 0 : et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX;
598 :
599 0 : ASSERT (s->thread_index == 0 || et == SESSION_IO_EVT_TX_MAIN);
600 0 : session_event_t evt = {
601 : .event_type = et,
602 0 : .session_handle = session_handle (s),
603 : };
604 :
605 0 : app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
606 : }
607 : else
608 : {
609 222868 : app_worker_add_event (app_wrk, s, et);
610 : }
611 222868 : }
612 :
613 : static inline int
614 0 : session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f,
615 : session_evt_type_t evt_type)
616 : {
617 : app_worker_t *app_wrk;
618 : application_t *app;
619 : u8 is_cl;
620 : int i;
621 :
622 0 : app = application_get (app_index);
623 0 : if (!app)
624 0 : return -1;
625 :
626 0 : is_cl = s->thread_index != vlib_get_thread_index ();
627 0 : for (i = 0; i < f->shr->n_subscribers; i++)
628 : {
629 0 : app_wrk = application_get_worker (app, f->shr->subscribers[i]);
630 0 : if (!app_wrk)
631 0 : continue;
632 0 : session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0);
633 : }
634 :
635 0 : return 0;
636 : }
637 :
638 : always_inline int
639 195478 : session_enqueue_notify_inline (session_t *s, u8 is_cl)
640 : {
641 : app_worker_t *app_wrk;
642 :
643 195478 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
644 195478 : if (PREDICT_FALSE (!app_wrk))
645 0 : return -1;
646 :
647 195478 : session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl);
648 :
649 195478 : if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
650 0 : return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo,
651 : SESSION_IO_EVT_RX);
652 :
653 195478 : return 0;
654 : }
655 :
656 : int
657 138156 : session_enqueue_notify (session_t *s)
658 : {
659 138156 : return session_enqueue_notify_inline (s, 0 /* is_cl */);
660 : }
661 :
662 : int
663 0 : session_enqueue_notify_cl (session_t *s)
664 : {
665 0 : return session_enqueue_notify_inline (s, 1 /* is_cl */);
666 : }
667 :
668 : int
669 27390 : session_dequeue_notify (session_t *s)
670 : {
671 : app_worker_t *app_wrk;
672 : u8 is_cl;
673 :
674 : /* Unset as soon as event is requested */
675 27390 : svm_fifo_clear_deq_ntf (s->tx_fifo);
676 :
677 27390 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
678 27390 : if (PREDICT_FALSE (!app_wrk))
679 0 : return -1;
680 :
681 54780 : is_cl = s->session_state == SESSION_STATE_LISTENING ||
682 27390 : s->session_state == SESSION_STATE_OPENED;
683 27390 : session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0);
684 :
685 27390 : if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo)))
686 0 : return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo,
687 : SESSION_IO_EVT_TX);
688 :
689 27390 : return 0;
690 : }
691 :
692 : /**
693 : * Flushes queue of sessions that are to be notified of new data
694 : * enqueued events.
695 : *
696 : * @param transport_proto transport protocol for which queue to be flushed
697 : * @param thread_index Thread index for which the flush is to be performed.
698 : * @return 0 on success or a positive number indicating the number of
699 : * failures due to API queue being full.
700 : */
701 : void
702 48923 : session_main_flush_enqueue_events (transport_proto_t transport_proto,
703 : u32 thread_index)
704 : {
705 48923 : session_worker_t *wrk = session_main_get_worker (thread_index);
706 : session_handle_t *handles;
707 : session_t *s;
708 : u32 i;
709 :
710 48923 : handles = wrk->session_to_enqueue[transport_proto];
711 :
712 106245 : for (i = 0; i < vec_len (handles); i++)
713 : {
714 57322 : s = session_get_from_handle (handles[i]);
715 57322 : session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
716 : 0 /* TODO/not needed */);
717 57322 : session_enqueue_notify_inline (s,
718 57322 : s->thread_index != thread_index ? 1 : 0);
719 : }
720 :
721 48923 : vec_reset_length (handles);
722 48923 : wrk->session_to_enqueue[transport_proto] = handles;
723 48923 : }
724 :
725 : /*
726 : * Enqueue data for delivery to app. If requested, it queues app notification
727 : * event for later delivery.
728 : *
729 : * @param tc Transport connection which is to be enqueued data
730 : * @param b Buffer to be enqueued
731 : * @param offset Offset at which to start enqueueing if out-of-order
732 : * @param queue_event Flag to indicate if peer is to be notified or if event
733 : * is to be queued. The former is useful when more data is
734 : * enqueued and only one event is to be generated.
735 : * @param is_in_order Flag to indicate if data is in order
736 : * @return Number of bytes enqueued or a negative value if enqueueing failed.
737 : */
738 : int
739 1029010 : session_enqueue_stream_connection (transport_connection_t * tc,
740 : vlib_buffer_t * b, u32 offset,
741 : u8 queue_event, u8 is_in_order)
742 : {
743 : session_t *s;
744 1029010 : int enqueued = 0, rv, in_order_off;
745 :
746 1029010 : s = session_get (tc->s_index, tc->thread_index);
747 :
748 1029010 : if (is_in_order)
749 : {
750 1029010 : enqueued = svm_fifo_enqueue (s->rx_fifo,
751 1029010 : b->current_length,
752 1029010 : vlib_buffer_get_current (b));
753 1029010 : if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
754 : && enqueued >= 0))
755 : {
756 0 : in_order_off = enqueued > b->current_length ? enqueued : 0;
757 0 : rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
758 0 : if (rv > 0)
759 0 : enqueued += rv;
760 : }
761 : }
762 : else
763 : {
764 0 : rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
765 0 : b->current_length,
766 0 : vlib_buffer_get_current (b));
767 0 : if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
768 0 : session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
769 : /* if something was enqueued, report even this as success for ooo
770 : * segment handling */
771 0 : return rv;
772 : }
773 :
774 1029010 : if (queue_event)
775 : {
776 : /* Queue RX event on this fifo. Eventually these will need to be
777 : * flushed by calling @ref session_main_flush_enqueue_events () */
778 1029010 : if (!(s->flags & SESSION_F_RX_EVT))
779 : {
780 41661 : session_worker_t *wrk = session_main_get_worker (s->thread_index);
781 41661 : ASSERT (s->thread_index == vlib_get_thread_index ());
782 41661 : s->flags |= SESSION_F_RX_EVT;
783 41661 : vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s));
784 : }
785 :
786 1029010 : session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
787 : }
788 :
789 1029010 : return enqueued;
790 : }
791 :
792 : always_inline int
793 59064 : session_enqueue_dgram_connection_inline (session_t *s,
794 : session_dgram_hdr_t *hdr,
795 : vlib_buffer_t *b, u8 proto,
796 : u8 queue_event, u32 is_cl)
797 : {
798 : int rv;
799 :
800 59064 : ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
801 : >= b->current_length + sizeof (*hdr));
802 :
803 59064 : if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
804 : {
805 118128 : svm_fifo_seg_t segs[2] = {
806 : { (u8 *) hdr, sizeof (*hdr) },
807 59064 : { vlib_buffer_get_current (b), b->current_length }
808 : };
809 :
810 59064 : rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
811 : 0 /* allow_partial */ );
812 : }
813 : else
814 : {
815 0 : vlib_main_t *vm = vlib_get_main ();
816 0 : svm_fifo_seg_t *segs = 0, *seg;
817 0 : vlib_buffer_t *it = b;
818 0 : u32 n_segs = 1;
819 :
820 0 : vec_add2 (segs, seg, 1);
821 0 : seg->data = (u8 *) hdr;
822 0 : seg->len = sizeof (*hdr);
823 0 : while (it)
824 : {
825 0 : vec_add2 (segs, seg, 1);
826 0 : seg->data = vlib_buffer_get_current (it);
827 0 : seg->len = it->current_length;
828 0 : n_segs++;
829 0 : if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
830 0 : break;
831 0 : it = vlib_get_buffer (vm, it->next_buffer);
832 : }
833 0 : rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
834 : 0 /* allow partial */ );
835 0 : vec_free (segs);
836 : }
837 :
838 59064 : if (queue_event && rv > 0)
839 : {
840 : /* Queue RX event on this fifo. Eventually these will need to be
841 : * flushed by calling @ref session_main_flush_enqueue_events () */
842 59064 : if (!(s->flags & SESSION_F_RX_EVT))
843 : {
844 15661 : u32 thread_index =
845 15661 : is_cl ? vlib_get_thread_index () : s->thread_index;
846 15661 : session_worker_t *wrk = session_main_get_worker (thread_index);
847 15661 : ASSERT (s->thread_index == vlib_get_thread_index () || is_cl);
848 15661 : s->flags |= SESSION_F_RX_EVT;
849 15661 : vec_add1 (wrk->session_to_enqueue[proto], session_handle (s));
850 : }
851 :
852 59064 : session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
853 : }
854 59064 : return rv > 0 ? rv : 0;
855 : }
856 :
857 : int
858 59064 : session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr,
859 : vlib_buffer_t *b, u8 proto, u8 queue_event)
860 : {
861 59064 : return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
862 : queue_event, 0 /* is_cl */);
863 : }
864 :
865 : int
866 0 : session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr,
867 : vlib_buffer_t *b, u8 proto,
868 : u8 queue_event)
869 : {
870 0 : return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
871 : queue_event, 1 /* is_cl */);
872 : }
873 :
874 : int
875 0 : session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
876 : u32 offset, u32 max_bytes)
877 : {
878 0 : session_t *s = session_get (tc->s_index, tc->thread_index);
879 0 : return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
880 : }
881 :
882 : u32
883 41571 : session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
884 : {
885 41571 : session_t *s = session_get (tc->s_index, tc->thread_index);
886 : u32 rv;
887 :
888 41571 : rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
889 41571 : session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
890 :
891 41571 : if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
892 21993 : session_dequeue_notify (s);
893 :
894 41571 : return rv;
895 : }
896 :
897 : int
898 132 : session_stream_connect_notify (transport_connection_t * tc,
899 : session_error_t err)
900 : {
901 132 : u32 opaque = 0, new_ti, new_si;
902 : app_worker_t *app_wrk;
903 132 : session_t *s = 0, *ho;
904 :
905 : /*
906 : * Cleanup half-open table
907 : */
908 132 : session_lookup_del_half_open (tc);
909 :
910 132 : ho = ho_session_get (tc->s_index);
911 132 : session_set_state (ho, SESSION_STATE_TRANSPORT_CLOSED);
912 132 : opaque = ho->opaque;
913 132 : app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
914 132 : if (!app_wrk)
915 0 : return -1;
916 :
917 132 : if (err)
918 0 : return app_worker_connect_notify (app_wrk, s, err, opaque);
919 :
920 132 : s = session_alloc_for_connection (tc);
921 132 : session_set_state (s, SESSION_STATE_CONNECTING);
922 132 : s->app_wrk_index = app_wrk->wrk_index;
923 132 : s->opaque = opaque;
924 132 : new_si = s->session_index;
925 132 : new_ti = s->thread_index;
926 :
927 132 : if ((err = app_worker_init_connected (app_wrk, s)))
928 : {
929 0 : session_free (s);
930 0 : app_worker_connect_notify (app_wrk, 0, err, opaque);
931 0 : return -1;
932 : }
933 :
934 132 : s = session_get (new_si, new_ti);
935 132 : session_set_state (s, SESSION_STATE_READY);
936 132 : session_lookup_add_connection (tc, session_handle (s));
937 :
938 132 : if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
939 : {
940 0 : session_lookup_del_connection (tc);
941 : /* Avoid notifying app about rejected session cleanup */
942 0 : s = session_get (new_si, new_ti);
943 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
944 0 : session_free (s);
945 0 : return -1;
946 : }
947 :
948 132 : return 0;
949 : }
950 :
951 : static void
952 0 : session_switch_pool_closed_rpc (void *arg)
953 : {
954 : session_handle_t sh;
955 : session_t *s;
956 :
957 0 : sh = pointer_to_uword (arg);
958 0 : s = session_get_from_handle_if_valid (sh);
959 0 : if (!s)
960 0 : return;
961 :
962 0 : transport_cleanup (session_get_transport_proto (s), s->connection_index,
963 0 : s->thread_index);
964 0 : session_cleanup (s);
965 : }
966 :
967 : typedef struct _session_switch_pool_args
968 : {
969 : u32 session_index;
970 : u32 thread_index;
971 : u32 new_thread_index;
972 : u32 new_session_index;
973 : } session_switch_pool_args_t;
974 :
975 : /**
976 : * Notify old thread of the session pool switch
977 : */
978 : static void
979 0 : session_switch_pool (void *cb_args)
980 : {
981 0 : session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
982 : session_handle_t sh, new_sh;
983 : segment_manager_t *sm;
984 : app_worker_t *app_wrk;
985 : session_t *s;
986 :
987 0 : ASSERT (args->thread_index == vlib_get_thread_index ());
988 0 : s = session_get (args->session_index, args->thread_index);
989 :
990 0 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
991 0 : if (!app_wrk)
992 0 : goto app_closed;
993 :
994 : /* Cleanup fifo segment slice state for fifos */
995 0 : sm = app_worker_get_connect_segment_manager (app_wrk);
996 0 : segment_manager_detach_fifo (sm, &s->rx_fifo);
997 0 : segment_manager_detach_fifo (sm, &s->tx_fifo);
998 :
999 : /* Check if session closed during migration */
1000 0 : if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1001 0 : goto app_closed;
1002 :
1003 : new_sh =
1004 0 : session_make_handle (args->new_session_index, args->new_thread_index);
1005 0 : app_worker_migrate_notify (app_wrk, s, new_sh);
1006 :
1007 0 : clib_mem_free (cb_args);
1008 0 : return;
1009 :
1010 0 : app_closed:
1011 : /* Session closed during migration. Clean everything up */
1012 0 : sh = session_handle (s);
1013 0 : session_send_rpc_evt_to_thread (args->new_thread_index,
1014 : session_switch_pool_closed_rpc,
1015 : uword_to_pointer (sh, void *));
1016 0 : clib_mem_free (cb_args);
1017 : }
1018 :
1019 : /**
1020 : * Move dgram session to the right thread
1021 : */
1022 : int
1023 0 : session_dgram_connect_notify (transport_connection_t * tc,
1024 : u32 old_thread_index, session_t ** new_session)
1025 : {
1026 : session_t *new_s;
1027 : session_switch_pool_args_t *rpc_args;
1028 : segment_manager_t *sm;
1029 : app_worker_t *app_wrk;
1030 :
1031 : /*
1032 : * Clone half-open session to the right thread.
1033 : */
1034 0 : new_s = session_clone_safe (tc->s_index, old_thread_index);
1035 0 : new_s->connection_index = tc->c_index;
1036 0 : session_set_state (new_s, SESSION_STATE_READY);
1037 0 : new_s->flags |= SESSION_F_IS_MIGRATING;
1038 :
1039 0 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1040 0 : session_lookup_add_connection (tc, session_handle (new_s));
1041 :
1042 0 : app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1043 0 : if (app_wrk)
1044 : {
1045 : /* New set of fifos attached to the same shared memory */
1046 0 : sm = app_worker_get_connect_segment_manager (app_wrk);
1047 0 : segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1048 0 : segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1049 : }
1050 :
1051 : /*
1052 : * Ask thread owning the old session to clean it up and make us the tx
1053 : * fifo owner
1054 : */
1055 0 : rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1056 0 : rpc_args->new_session_index = new_s->session_index;
1057 0 : rpc_args->new_thread_index = new_s->thread_index;
1058 0 : rpc_args->session_index = tc->s_index;
1059 0 : rpc_args->thread_index = old_thread_index;
1060 0 : session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1061 : rpc_args);
1062 :
1063 0 : tc->s_index = new_s->session_index;
1064 0 : new_s->connection_index = tc->c_index;
1065 0 : *new_session = new_s;
1066 0 : return 0;
1067 : }
1068 :
1069 : /**
1070 : * Notification from transport that connection is being closed.
1071 : *
1072 : * A disconnect is sent to application but state is not removed. Once
1073 : * disconnect is acknowledged by application, session disconnect is called.
1074 : * Ultimately this leads to close being called on transport (passive close).
1075 : */
1076 : void
1077 193 : session_transport_closing_notify (transport_connection_t * tc)
1078 : {
1079 : app_worker_t *app_wrk;
1080 : session_t *s;
1081 :
1082 193 : s = session_get (tc->s_index, tc->thread_index);
1083 193 : if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1084 24 : return;
1085 :
1086 : /* Wait for reply from app before sending notification as the
1087 : * accept might be rejected */
1088 169 : if (s->session_state == SESSION_STATE_ACCEPTING)
1089 : {
1090 0 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1091 0 : return;
1092 : }
1093 :
1094 169 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1095 169 : app_wrk = app_worker_get (s->app_wrk_index);
1096 169 : app_worker_close_notify (app_wrk, s);
1097 : }
1098 :
1099 : /**
1100 : * Notification from transport that connection is being deleted
1101 : *
1102 : * This removes the session if it is still valid. It should be called only on
1103 : * previously fully established sessions. For instance failed connects should
1104 : * call stream_session_connect_notify and indicate that the connect has
1105 : * failed.
1106 : */
1107 : void
1108 253 : session_transport_delete_notify (transport_connection_t * tc)
1109 : {
1110 : session_t *s;
1111 :
1112 : /* App might've been removed already */
1113 253 : if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1114 0 : return;
1115 :
1116 253 : switch (s->session_state)
1117 : {
1118 0 : case SESSION_STATE_CREATED:
1119 : /* Session was created but accept notification was not yet sent to the
1120 : * app. Cleanup everything. */
1121 0 : session_lookup_del_session (s);
1122 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1123 0 : session_free (s);
1124 0 : break;
1125 18 : case SESSION_STATE_ACCEPTING:
1126 : case SESSION_STATE_TRANSPORT_CLOSING:
1127 : case SESSION_STATE_CLOSING:
1128 : case SESSION_STATE_TRANSPORT_CLOSED:
1129 : /* If transport finishes or times out before we get a reply
1130 : * from the app, mark transport as closed and wait for reply
1131 : * before removing the session. Cleanup session table in advance
1132 : * because transport will soon be closed and closed sessions
1133 : * are assumed to have been removed from the lookup table */
1134 18 : session_lookup_del_session (s);
1135 18 : session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1136 18 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1137 18 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1138 18 : break;
1139 97 : case SESSION_STATE_APP_CLOSED:
1140 : /* Cleanup lookup table as transport needs to still be valid.
1141 : * Program transport close to ensure that all session events
1142 : * have been cleaned up. Once transport close is called, the
1143 : * session is just removed because both transport and app have
1144 : * confirmed the close*/
1145 97 : session_lookup_del_session (s);
1146 97 : session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1147 97 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1148 97 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1149 97 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1150 97 : break;
1151 0 : case SESSION_STATE_TRANSPORT_DELETED:
1152 0 : break;
1153 138 : case SESSION_STATE_CLOSED:
1154 138 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1155 138 : session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1156 138 : session_delete (s);
1157 138 : break;
1158 0 : default:
1159 0 : clib_warning ("session state %u", s->session_state);
1160 0 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1161 0 : session_delete (s);
1162 0 : break;
1163 : }
1164 : }
1165 :
1166 : /**
1167 : * Notification from transport that it is closed
1168 : *
1169 : * Should be called by transport, prior to calling delete notify, once it
1170 : * knows that no more data will be exchanged. This could serve as an
1171 : * early acknowledgment of an active close especially if transport delete
1172 : * can be delayed a long time, e.g., tcp time-wait.
1173 : */
1174 : void
1175 295 : session_transport_closed_notify (transport_connection_t * tc)
1176 : {
1177 : app_worker_t *app_wrk;
1178 : session_t *s;
1179 :
1180 295 : if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1181 0 : return;
1182 :
1183 : /* Transport thinks that app requested close but it actually didn't.
1184 : * Can happen for tcp:
1185 : * 1)if fin and rst are received in close succession.
1186 : * 2)if app shutdown the connection. */
1187 295 : if (s->session_state == SESSION_STATE_READY)
1188 : {
1189 0 : session_transport_closing_notify (tc);
1190 0 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1191 0 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1192 : }
1193 : /* If app close has not been received or has not yet resulted in
1194 : * a transport close, only mark the session transport as closed */
1195 295 : else if (s->session_state <= SESSION_STATE_CLOSING)
1196 4 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1197 : /* If app also closed, switch to closed */
1198 291 : else if (s->session_state == SESSION_STATE_APP_CLOSED)
1199 291 : session_set_state (s, SESSION_STATE_CLOSED);
1200 :
1201 295 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1202 295 : if (app_wrk)
1203 264 : app_worker_transport_closed_notify (app_wrk, s);
1204 : }
1205 :
1206 : /**
1207 : * Notify application that connection has been reset.
1208 : */
1209 : void
1210 6 : session_transport_reset_notify (transport_connection_t * tc)
1211 : {
1212 : app_worker_t *app_wrk;
1213 : session_t *s;
1214 :
1215 6 : s = session_get (tc->s_index, tc->thread_index);
1216 6 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1217 6 : if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1218 0 : return;
1219 6 : if (s->session_state == SESSION_STATE_ACCEPTING)
1220 : {
1221 0 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1222 0 : return;
1223 : }
1224 6 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1225 6 : app_wrk = app_worker_get (s->app_wrk_index);
1226 6 : app_worker_reset_notify (app_wrk, s);
1227 : }
1228 :
1229 : int
1230 132 : session_stream_accept_notify (transport_connection_t * tc)
1231 : {
1232 : app_worker_t *app_wrk;
1233 : session_t *s;
1234 :
1235 132 : s = session_get (tc->s_index, tc->thread_index);
1236 132 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1237 132 : if (!app_wrk)
1238 0 : return -1;
1239 132 : if (s->session_state != SESSION_STATE_CREATED)
1240 0 : return 0;
1241 132 : session_set_state (s, SESSION_STATE_ACCEPTING);
1242 132 : if (app_worker_accept_notify (app_wrk, s))
1243 : {
1244 : /* On transport delete, no notifications should be sent. Unless, the
1245 : * accept is retried and successful. */
1246 0 : session_set_state (s, SESSION_STATE_CREATED);
1247 0 : return -1;
1248 : }
1249 132 : return 0;
1250 : }
1251 :
1252 : /**
1253 : * Accept a stream session. Optionally ping the server by callback.
1254 : */
1255 : int
1256 135 : session_stream_accept (transport_connection_t * tc, u32 listener_index,
1257 : u32 thread_index, u8 notify)
1258 : {
1259 : session_t *s;
1260 : int rv;
1261 :
1262 135 : s = session_alloc_for_connection (tc);
1263 135 : s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1264 135 : session_set_state (s, SESSION_STATE_CREATED);
1265 :
1266 135 : if ((rv = app_worker_init_accepted (s)))
1267 : {
1268 0 : session_free (s);
1269 0 : return rv;
1270 : }
1271 :
1272 135 : session_lookup_add_connection (tc, session_handle (s));
1273 :
1274 : /* Shoulder-tap the server */
1275 135 : if (notify)
1276 : {
1277 0 : app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1278 0 : if ((rv = app_worker_accept_notify (app_wrk, s)))
1279 : {
1280 0 : session_lookup_del_session (s);
1281 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1282 0 : session_free (s);
1283 0 : return rv;
1284 : }
1285 : }
1286 :
1287 135 : return 0;
1288 : }
1289 :
1290 : int
1291 20 : session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1292 : u32 thread_index)
1293 : {
1294 : app_worker_t *app_wrk;
1295 : session_t *s;
1296 : int rv;
1297 :
1298 20 : s = session_alloc_for_connection (tc);
1299 20 : s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1300 :
1301 20 : if ((rv = app_worker_init_accepted (s)))
1302 : {
1303 0 : session_free (s);
1304 0 : return rv;
1305 : }
1306 :
1307 20 : session_lookup_add_connection (tc, session_handle (s));
1308 20 : session_set_state (s, SESSION_STATE_ACCEPTING);
1309 :
1310 20 : app_wrk = app_worker_get (s->app_wrk_index);
1311 20 : if ((rv = app_worker_accept_notify (app_wrk, s)))
1312 : {
1313 0 : session_lookup_del_session (s);
1314 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1315 0 : session_free (s);
1316 0 : return rv;
1317 : }
1318 :
1319 20 : return 0;
1320 : }
1321 :
1322 : int
1323 24 : session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1324 : {
1325 : transport_connection_t *tc;
1326 : transport_endpoint_cfg_t *tep;
1327 : app_worker_t *app_wrk;
1328 : session_handle_t sh;
1329 : session_t *s;
1330 : int rv;
1331 :
1332 24 : tep = session_endpoint_to_transport_cfg (rmt);
1333 24 : rv = transport_connect (rmt->transport_proto, tep);
1334 24 : if (rv < 0)
1335 : {
1336 : SESSION_DBG ("Transport failed to open connection.");
1337 0 : return rv;
1338 : }
1339 :
1340 24 : tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1341 :
1342 : /* For dgram type of service, allocate session and fifos now */
1343 24 : app_wrk = app_worker_get (rmt->app_wrk_index);
1344 24 : s = session_alloc_for_connection (tc);
1345 24 : s->app_wrk_index = app_wrk->wrk_index;
1346 24 : s->opaque = rmt->opaque;
1347 24 : session_set_state (s, SESSION_STATE_OPENED);
1348 24 : if (app_worker_init_connected (app_wrk, s))
1349 : {
1350 0 : session_free (s);
1351 0 : return -1;
1352 : }
1353 :
1354 24 : sh = session_handle (s);
1355 24 : *rsh = sh;
1356 :
1357 24 : session_lookup_add_connection (tc, sh);
1358 24 : return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1359 : }
1360 :
1361 : int
1362 153 : session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1363 : {
1364 : transport_connection_t *tc;
1365 : transport_endpoint_cfg_t *tep;
1366 : app_worker_t *app_wrk;
1367 : session_t *ho;
1368 : int rv;
1369 :
1370 153 : tep = session_endpoint_to_transport_cfg (rmt);
1371 153 : rv = transport_connect (rmt->transport_proto, tep);
1372 153 : if (rv < 0)
1373 : {
1374 : SESSION_DBG ("Transport failed to open connection.");
1375 5 : return rv;
1376 : }
1377 :
1378 148 : tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1379 :
1380 148 : app_wrk = app_worker_get (rmt->app_wrk_index);
1381 :
1382 : /* If transport offers a vc service, only allocate established
1383 : * session once the connection has been established.
1384 : * In the meantime allocate half-open session for tracking purposes
1385 : * associate half-open connection to it and add session to app-worker
1386 : * half-open table. These are needed to allocate the established
1387 : * session on transport notification, and to cleanup the half-open
1388 : * session if the app detaches before connection establishment.
1389 : */
1390 148 : ho = session_alloc_for_half_open (tc);
1391 148 : ho->app_wrk_index = app_wrk->wrk_index;
1392 148 : ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1393 148 : ho->opaque = rmt->opaque;
1394 148 : *rsh = session_handle (ho);
1395 :
1396 148 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1397 132 : session_lookup_add_half_open (tc, tc->c_index);
1398 :
1399 148 : return 0;
1400 : }
1401 :
1402 : int
1403 36 : session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1404 : {
1405 36 : transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1406 :
1407 : /* Not supported for now */
1408 36 : *rsh = SESSION_INVALID_HANDLE;
1409 36 : return transport_connect (rmt->transport_proto, tep_cfg);
1410 : }
1411 :
1412 : typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1413 : session_handle_t *);
1414 :
1415 : /* *INDENT-OFF* */
1416 : static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1417 : session_open_vc,
1418 : session_open_cl,
1419 : session_open_app,
1420 : };
1421 : /* *INDENT-ON* */
1422 :
1423 : /**
1424 : * Ask transport to open connection to remote transport endpoint.
1425 : *
1426 : * Stores handle for matching request with reply since the call can be
1427 : * asynchronous. For instance, for TCP the 3-way handshake must complete
1428 : * before reply comes. Session is only created once connection is established.
1429 : *
1430 : * @param app_index Index of the application requesting the connect
1431 : * @param st Session type requested.
1432 : * @param tep Remote transport endpoint
1433 : * @param opaque Opaque data (typically, api_context) the application expects
1434 : * on open completion.
1435 : */
1436 : int
1437 213 : session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1438 : {
1439 : transport_service_type_t tst;
1440 213 : tst = transport_protocol_service_type (rmt->transport_proto);
1441 213 : return session_open_srv_fns[tst](rmt, rsh);
1442 : }
1443 :
1444 : /**
1445 : * Ask transport to listen on session endpoint.
1446 : *
1447 : * @param s Session for which listen will be called. Note that unlike
1448 : * established sessions, listen sessions are not associated to a
1449 : * thread.
1450 : * @param sep Local endpoint to be listened on.
1451 : */
1452 : int
1453 78 : session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1454 : {
1455 : transport_endpoint_cfg_t *tep;
1456 : int tc_index;
1457 : u32 s_index;
1458 :
1459 : /* Transport bind/listen */
1460 78 : tep = session_endpoint_to_transport_cfg (sep);
1461 78 : s_index = ls->session_index;
1462 78 : tc_index = transport_start_listen (session_get_transport_proto (ls),
1463 : s_index, tep);
1464 :
1465 78 : if (tc_index < 0)
1466 0 : return tc_index;
1467 :
1468 : /* Attach transport to session. Lookup tables are populated by the app
1469 : * worker because local tables (for ct sessions) are not backed by a fib */
1470 78 : ls = listen_session_get (s_index);
1471 78 : ls->connection_index = tc_index;
1472 78 : ls->opaque = sep->opaque;
1473 :
1474 78 : return 0;
1475 : }
1476 :
1477 : /**
1478 : * Ask transport to stop listening on local transport endpoint.
1479 : *
1480 : * @param s Session to stop listening on. It must be in state LISTENING.
1481 : */
1482 : int
1483 62 : session_stop_listen (session_t * s)
1484 : {
1485 62 : transport_proto_t tp = session_get_transport_proto (s);
1486 : transport_connection_t *tc;
1487 :
1488 62 : if (s->session_state != SESSION_STATE_LISTENING)
1489 0 : return SESSION_E_NOLISTEN;
1490 :
1491 62 : tc = transport_get_listener (tp, s->connection_index);
1492 :
1493 : /* If no transport, assume everything was cleaned up already */
1494 62 : if (!tc)
1495 0 : return SESSION_E_NONE;
1496 :
1497 62 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1498 59 : session_lookup_del_connection (tc);
1499 :
1500 62 : transport_stop_listen (tp, s->connection_index);
1501 62 : return 0;
1502 : }
1503 :
1504 : /**
1505 : * Initialize session half-closing procedure.
1506 : *
1507 : * Note that half-closing will not change the state of the session.
1508 : */
1509 : void
1510 0 : session_half_close (session_t *s)
1511 : {
1512 0 : if (!s)
1513 0 : return;
1514 :
1515 0 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1516 : }
1517 :
1518 : /**
1519 : * Initialize session closing procedure.
1520 : *
1521 : * Request is always sent to session node to ensure that all outstanding
1522 : * requests are served before transport is notified.
1523 : */
1524 : void
1525 571 : session_close (session_t * s)
1526 : {
1527 571 : if (!s || (s->flags & SESSION_F_APP_CLOSED))
1528 161 : return;
1529 :
1530 : /* Transports can close and delete their state independent of app closes
1531 : * and transport initiated state transitions can hide app closes. Instead
1532 : * of extending the state machine to support separate tracking of app and
1533 : * transport initiated closes, use a flag. */
1534 410 : s->flags |= SESSION_F_APP_CLOSED;
1535 :
1536 410 : if (s->session_state >= SESSION_STATE_CLOSING)
1537 : {
1538 : /* Session will only be removed once both app and transport
1539 : * acknowledge the close */
1540 22 : if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1541 18 : || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1542 22 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1543 22 : return;
1544 : }
1545 :
1546 : /* App closed so stop propagating dequeue notifications.
1547 : * App might disconnect session before connected, in this case,
1548 : * tx_fifo may not be setup yet, so clear only it's inited. */
1549 388 : if (s->tx_fifo)
1550 388 : svm_fifo_clear_deq_ntf (s->tx_fifo);
1551 388 : session_set_state (s, SESSION_STATE_CLOSING);
1552 388 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1553 : }
1554 :
1555 : /**
1556 : * Force a close without waiting for data to be flushed
1557 : */
1558 : void
1559 0 : session_reset (session_t * s)
1560 : {
1561 0 : if (s->session_state >= SESSION_STATE_CLOSING)
1562 0 : return;
1563 : /* Drop all outstanding tx data
1564 : * App might disconnect session before connected, in this case,
1565 : * tx_fifo may not be setup yet, so clear only it's inited. */
1566 0 : if (s->tx_fifo)
1567 0 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1568 0 : session_set_state (s, SESSION_STATE_CLOSING);
1569 0 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1570 : }
1571 :
1572 : /**
1573 : * Notify transport the session can be half-disconnected.
1574 : *
1575 : * Must be called from the session's thread.
1576 : */
1577 : void
1578 0 : session_transport_half_close (session_t *s)
1579 : {
1580 : /* Only READY session can be half-closed */
1581 0 : if (s->session_state != SESSION_STATE_READY)
1582 : {
1583 0 : return;
1584 : }
1585 :
1586 0 : transport_half_close (session_get_transport_proto (s), s->connection_index,
1587 0 : s->thread_index);
1588 : }
1589 :
1590 : /**
1591 : * Notify transport the session can be disconnected. This should eventually
1592 : * result in a delete notification that allows us to cleanup session state.
1593 : * Called for both active/passive disconnects.
1594 : *
1595 : * Must be called from the session's thread.
1596 : */
1597 : void
1598 507 : session_transport_close (session_t * s)
1599 : {
1600 507 : if (s->session_state >= SESSION_STATE_APP_CLOSED)
1601 : {
1602 119 : if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1603 4 : session_set_state (s, SESSION_STATE_CLOSED);
1604 : /* If transport is already deleted, just free the session */
1605 115 : else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1606 115 : session_program_cleanup (s);
1607 119 : return;
1608 : }
1609 :
1610 : /* If the tx queue wasn't drained, the transport can continue to try
1611 : * sending the outstanding data (in closed state it cannot). It MUST however
1612 : * at one point, either after sending everything or after a timeout, call
1613 : * delete notify. This will finally lead to the complete cleanup of the
1614 : * session.
1615 : */
1616 388 : session_set_state (s, SESSION_STATE_APP_CLOSED);
1617 :
1618 388 : transport_close (session_get_transport_proto (s), s->connection_index,
1619 388 : s->thread_index);
1620 : }
1621 :
1622 : /**
1623 : * Force transport close
1624 : */
1625 : void
1626 0 : session_transport_reset (session_t * s)
1627 : {
1628 0 : if (s->session_state >= SESSION_STATE_APP_CLOSED)
1629 : {
1630 0 : if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1631 0 : session_set_state (s, SESSION_STATE_CLOSED);
1632 0 : else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1633 0 : session_program_cleanup (s);
1634 0 : return;
1635 : }
1636 :
1637 0 : session_set_state (s, SESSION_STATE_APP_CLOSED);
1638 0 : transport_reset (session_get_transport_proto (s), s->connection_index,
1639 0 : s->thread_index);
1640 : }
1641 :
1642 : /**
1643 : * Cleanup transport and session state.
1644 : *
1645 : * Notify transport of the cleanup and free the session. This should
1646 : * be called only if transport reported some error and is already
1647 : * closed.
1648 : */
1649 : void
1650 0 : session_transport_cleanup (session_t * s)
1651 : {
1652 : /* Delete from main lookup table before we axe the the transport */
1653 0 : session_lookup_del_session (s);
1654 0 : if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1655 0 : transport_cleanup (session_get_transport_proto (s), s->connection_index,
1656 0 : s->thread_index);
1657 : /* Since we called cleanup, no delete notification will come. So, make
1658 : * sure the session is properly freed. */
1659 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1660 0 : session_free (s);
1661 0 : }
1662 :
1663 : /**
1664 : * Allocate worker mqs in share-able segment
1665 : *
1666 : * That can only be a newly created memfd segment, that must be mapped
1667 : * by all apps/stack users unless private rx mqs are enabled.
1668 : */
1669 : void
1670 49 : session_vpp_wrk_mqs_alloc (session_main_t *smm)
1671 : {
1672 49 : u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1673 49 : fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1674 49 : svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1675 : uword mqs_seg_size;
1676 : int i;
1677 :
1678 49 : mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1679 :
1680 49 : svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1681 49 : { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1682 : };
1683 49 : cfg->consumer_pid = 0;
1684 49 : cfg->n_rings = 2;
1685 49 : cfg->q_nitems = mq_q_length;
1686 49 : cfg->ring_cfgs = rc;
1687 :
1688 : /*
1689 : * Compute mqs segment size based on rings config and leave space
1690 : * for passing extended configuration messages, i.e., data allocated
1691 : * outside of the rings. If provided with a config value, accept it
1692 : * if larger than minimum size.
1693 : */
1694 49 : mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1695 49 : mqs_seg_size = mqs_seg_size + (1 << 20);
1696 49 : mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1697 :
1698 49 : mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1699 49 : mqs_seg->ssvm.my_pid = getpid ();
1700 49 : mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1701 :
1702 49 : if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1703 : {
1704 0 : clib_warning ("failed to initialize queue segment");
1705 0 : return;
1706 : }
1707 :
1708 49 : fifo_segment_init (mqs_seg);
1709 :
1710 : /* Special fifo segment that's filled only with mqs */
1711 49 : mqs_seg->h->n_mqs = vec_len (smm->wrk);
1712 :
1713 119 : for (i = 0; i < vec_len (smm->wrk); i++)
1714 70 : smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1715 : }
1716 :
1717 : fifo_segment_t *
1718 260 : session_main_get_wrk_mqs_segment (void)
1719 : {
1720 260 : return &session_main.wrk_mqs_segment;
1721 : }
1722 :
1723 : u64
1724 113 : session_segment_handle (session_t * s)
1725 : {
1726 : svm_fifo_t *f;
1727 :
1728 113 : if (!s->rx_fifo)
1729 0 : return SESSION_INVALID_HANDLE;
1730 :
1731 113 : f = s->rx_fifo;
1732 113 : return segment_manager_make_segment_handle (f->segment_manager,
1733 : f->segment_index);
1734 : }
1735 :
1736 : void
1737 0 : session_get_original_dst (transport_endpoint_t *i2o_src,
1738 : transport_endpoint_t *i2o_dst,
1739 : transport_proto_t transport_proto, u32 *original_dst,
1740 : u16 *original_dst_port)
1741 : {
1742 0 : session_main_t *smm = vnet_get_session_main ();
1743 0 : ip_protocol_t proto =
1744 0 : (transport_proto == TRANSPORT_PROTO_TCP ? IPPROTO_TCP : IPPROTO_UDP);
1745 0 : if (!smm->original_dst_lookup || !i2o_dst->is_ip4)
1746 0 : return;
1747 0 : smm->original_dst_lookup (&i2o_src->ip.ip4, i2o_src->port, &i2o_dst->ip.ip4,
1748 0 : i2o_dst->port, proto, original_dst,
1749 : original_dst_port);
1750 : }
1751 :
1752 : /* *INDENT-OFF* */
1753 : static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1754 : session_tx_fifo_peek_and_snd,
1755 : session_tx_fifo_dequeue_and_snd,
1756 : session_tx_fifo_dequeue_internal,
1757 : session_tx_fifo_dequeue_and_snd
1758 : };
1759 : /* *INDENT-ON* */
1760 :
1761 : void
1762 6950 : session_register_transport (transport_proto_t transport_proto,
1763 : const transport_proto_vft_t * vft, u8 is_ip4,
1764 : u32 output_node)
1765 : {
1766 6950 : session_main_t *smm = &session_main;
1767 : session_type_t session_type;
1768 6950 : u32 next_index = ~0;
1769 :
1770 6950 : session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1771 :
1772 6950 : vec_validate (smm->session_type_to_next, session_type);
1773 6950 : vec_validate (smm->session_tx_fns, session_type);
1774 :
1775 6950 : if (output_node != ~0)
1776 2300 : next_index = vlib_node_add_next (vlib_get_main (),
1777 2300 : session_queue_node.index, output_node);
1778 :
1779 6950 : smm->session_type_to_next[session_type] = next_index;
1780 6950 : smm->session_tx_fns[session_type] =
1781 6950 : session_tx_fns[vft->transport_options.tx_type];
1782 6950 : }
1783 :
1784 : void
1785 81 : session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1786 : {
1787 81 : session_main_t *smm = &session_main;
1788 : session_update_time_fn *fi;
1789 81 : u32 fi_pos = ~0;
1790 81 : u8 found = 0;
1791 :
1792 105 : vec_foreach (fi, smm->update_time_fns)
1793 : {
1794 32 : if (*fi == fn)
1795 : {
1796 8 : fi_pos = fi - smm->update_time_fns;
1797 8 : found = 1;
1798 8 : break;
1799 : }
1800 : }
1801 :
1802 81 : if (is_add)
1803 : {
1804 73 : if (found)
1805 : {
1806 0 : clib_warning ("update time fn %p already registered", fn);
1807 0 : return;
1808 : }
1809 73 : vec_add1 (smm->update_time_fns, fn);
1810 : }
1811 : else
1812 : {
1813 8 : vec_del1 (smm->update_time_fns, fi_pos);
1814 : }
1815 : }
1816 :
1817 : transport_proto_t
1818 0 : session_add_transport_proto (void)
1819 : {
1820 0 : session_main_t *smm = &session_main;
1821 : session_worker_t *wrk;
1822 : u32 thread;
1823 :
1824 0 : smm->last_transport_proto_type += 1;
1825 :
1826 0 : for (thread = 0; thread < vec_len (smm->wrk); thread++)
1827 : {
1828 0 : wrk = session_main_get_worker (thread);
1829 0 : vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1830 : }
1831 :
1832 0 : return smm->last_transport_proto_type;
1833 : }
1834 :
1835 : transport_connection_t *
1836 6913180 : session_get_transport (session_t * s)
1837 : {
1838 6913180 : if (s->session_state != SESSION_STATE_LISTENING)
1839 6913000 : return transport_get_connection (session_get_transport_proto (s),
1840 6913000 : s->connection_index, s->thread_index);
1841 : else
1842 182 : return transport_get_listener (session_get_transport_proto (s),
1843 : s->connection_index);
1844 : }
1845 :
1846 : void
1847 172 : session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1848 : {
1849 172 : if (s->session_state != SESSION_STATE_LISTENING)
1850 123 : return transport_get_endpoint (session_get_transport_proto (s),
1851 123 : s->connection_index, s->thread_index, tep,
1852 : is_lcl);
1853 : else
1854 49 : return transport_get_listener_endpoint (session_get_transport_proto (s),
1855 : s->connection_index, tep, is_lcl);
1856 : }
1857 :
1858 : int
1859 5 : session_transport_attribute (session_t *s, u8 is_get,
1860 : transport_endpt_attr_t *attr)
1861 : {
1862 5 : if (s->session_state < SESSION_STATE_READY)
1863 0 : return -1;
1864 :
1865 5 : return transport_connection_attribute (session_get_transport_proto (s),
1866 5 : s->connection_index, s->thread_index,
1867 : is_get, attr);
1868 : }
1869 :
1870 : transport_connection_t *
1871 17 : listen_session_get_transport (session_t * s)
1872 : {
1873 17 : return transport_get_listener (session_get_transport_proto (s),
1874 : s->connection_index);
1875 : }
1876 :
1877 : void
1878 12 : session_queue_run_on_main_thread (vlib_main_t * vm)
1879 : {
1880 12 : ASSERT (vlib_get_thread_index () == 0);
1881 12 : vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1882 12 : }
1883 :
1884 : static void
1885 25 : session_stats_collector_fn (vlib_stats_collector_data_t *d)
1886 : {
1887 25 : u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1888 25 : session_main_t *smm = &session_main;
1889 : session_worker_t *wrk;
1890 : counter_t **counters;
1891 : counter_t *cb;
1892 :
1893 25 : n_workers = vec_len (smm->wrk);
1894 25 : vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1895 25 : counters = d->entry->data;
1896 25 : cb = counters[0];
1897 :
1898 71 : for (i = 0; i < vec_len (smm->wrk); i++)
1899 : {
1900 46 : wrk = session_main_get_worker (i);
1901 46 : n_wrk_sessions = pool_elts (wrk->sessions);
1902 46 : cb[i] = n_wrk_sessions;
1903 46 : n_sessions += n_wrk_sessions;
1904 : }
1905 :
1906 25 : vlib_stats_set_gauge (d->private_data, n_sessions);
1907 25 : }
1908 :
1909 : static void
1910 49 : session_stats_collector_init (void)
1911 : {
1912 49 : vlib_stats_collector_reg_t reg = {};
1913 :
1914 49 : reg.entry_index =
1915 49 : vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1916 49 : reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1917 49 : reg.collect_fn = session_stats_collector_fn;
1918 49 : vlib_stats_register_collector_fn (®);
1919 49 : vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1920 49 : }
1921 :
1922 : static clib_error_t *
1923 49 : session_manager_main_enable (vlib_main_t * vm)
1924 : {
1925 49 : session_main_t *smm = &session_main;
1926 49 : vlib_thread_main_t *vtm = vlib_get_thread_main ();
1927 : u32 num_threads, preallocated_sessions_per_worker;
1928 : session_worker_t *wrk;
1929 : int i;
1930 :
1931 : /* We only initialize once and do not de-initialized on disable */
1932 49 : if (smm->is_initialized)
1933 0 : goto done;
1934 :
1935 49 : num_threads = 1 /* main thread */ + vtm->n_threads;
1936 :
1937 49 : if (num_threads < 1)
1938 0 : return clib_error_return (0, "n_thread_stacks not set");
1939 :
1940 : /* Allocate cache line aligned worker contexts */
1941 49 : vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1942 49 : clib_spinlock_init (&session_main.pool_realloc_lock);
1943 :
1944 119 : for (i = 0; i < num_threads; i++)
1945 : {
1946 70 : wrk = &smm->wrk[i];
1947 70 : wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1948 70 : wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1949 70 : wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1950 70 : wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1951 70 : wrk->evts_pending_main =
1952 70 : clib_llist_make_head (wrk->event_elts, evt_list);
1953 70 : wrk->vm = vlib_get_main_by_index (i);
1954 70 : wrk->last_vlib_time = vlib_time_now (vm);
1955 70 : wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1956 70 : wrk->timerfd = -1;
1957 70 : vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1958 :
1959 70 : if (!smm->no_adaptive && smm->use_private_rx_mqs)
1960 2 : session_wrk_enable_adaptive_mode (wrk);
1961 : }
1962 :
1963 : /* Allocate vpp event queues segment and queue */
1964 49 : session_vpp_wrk_mqs_alloc (smm);
1965 :
1966 : /* Initialize segment manager properties */
1967 49 : segment_manager_main_init ();
1968 :
1969 : /* Preallocate sessions */
1970 49 : if (smm->preallocated_sessions)
1971 : {
1972 21 : if (num_threads == 1)
1973 : {
1974 0 : pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1975 : }
1976 : else
1977 : {
1978 : int j;
1979 21 : preallocated_sessions_per_worker =
1980 21 : (1.1 * (f64) smm->preallocated_sessions /
1981 21 : (f64) (num_threads - 1));
1982 :
1983 42 : for (j = 1; j < num_threads; j++)
1984 : {
1985 21 : pool_init_fixed (smm->wrk[j].sessions,
1986 : preallocated_sessions_per_worker);
1987 : }
1988 : }
1989 : }
1990 :
1991 49 : session_lookup_init ();
1992 49 : app_namespaces_init ();
1993 49 : transport_init ();
1994 49 : session_stats_collector_init ();
1995 49 : smm->is_initialized = 1;
1996 :
1997 49 : done:
1998 :
1999 49 : smm->is_enabled = 1;
2000 :
2001 : /* Enable transports */
2002 49 : transport_enable_disable (vm, 1);
2003 49 : session_debug_init ();
2004 :
2005 49 : return 0;
2006 : }
2007 :
2008 : static void
2009 8 : session_manager_main_disable (vlib_main_t * vm)
2010 : {
2011 8 : transport_enable_disable (vm, 0 /* is_en */ );
2012 8 : }
2013 :
2014 : /* in this new callback, cookie hint the index */
2015 : void
2016 0 : session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
2017 : {
2018 : session_worker_t *wrk;
2019 0 : wrk = session_main_get_worker (vm->thread_index);
2020 : session_dma_transfer *dma_transfer;
2021 :
2022 0 : dma_transfer = &wrk->dma_trans[wrk->trans_head];
2023 0 : vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2024 : vec_len (dma_transfer->pending_tx_buffers));
2025 0 : vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2026 : vec_len (dma_transfer->pending_tx_nexts));
2027 0 : vec_reset_length (dma_transfer->pending_tx_buffers);
2028 0 : vec_reset_length (dma_transfer->pending_tx_nexts);
2029 0 : wrk->trans_head++;
2030 0 : if (wrk->trans_head == wrk->trans_size)
2031 0 : wrk->trans_head = 0;
2032 0 : return;
2033 : }
2034 :
2035 : static void
2036 0 : session_prepare_dma_args (vlib_dma_config_t *args)
2037 : {
2038 0 : args->max_batches = 16;
2039 0 : args->max_transfers = DMA_TRANS_SIZE;
2040 0 : args->max_transfer_size = 65536;
2041 0 : args->features = 0;
2042 0 : args->sw_fallback = 1;
2043 0 : args->barrier_before_last = 1;
2044 0 : args->callback_fn = session_dma_completion_cb;
2045 0 : }
2046 :
2047 : static void
2048 0 : session_node_enable_dma (u8 is_en, int n_vlibs)
2049 : {
2050 : vlib_dma_config_t args;
2051 0 : session_prepare_dma_args (&args);
2052 : session_worker_t *wrk;
2053 : vlib_main_t *vm;
2054 :
2055 0 : int config_index = -1;
2056 :
2057 0 : if (is_en)
2058 : {
2059 0 : vm = vlib_get_main_by_index (0);
2060 0 : config_index = vlib_dma_config_add (vm, &args);
2061 : }
2062 : else
2063 : {
2064 0 : vm = vlib_get_main_by_index (0);
2065 0 : wrk = session_main_get_worker (0);
2066 0 : if (wrk->config_index >= 0)
2067 0 : vlib_dma_config_del (vm, wrk->config_index);
2068 : }
2069 : int i;
2070 0 : for (i = 0; i < n_vlibs; i++)
2071 : {
2072 0 : vm = vlib_get_main_by_index (i);
2073 0 : wrk = session_main_get_worker (vm->thread_index);
2074 0 : wrk->config_index = config_index;
2075 0 : if (is_en)
2076 : {
2077 0 : if (config_index >= 0)
2078 0 : wrk->dma_enabled = true;
2079 0 : wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2080 : sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2081 0 : bzero (wrk->dma_trans,
2082 : sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2083 : }
2084 : else
2085 : {
2086 0 : if (wrk->dma_trans)
2087 0 : clib_mem_free (wrk->dma_trans);
2088 : }
2089 0 : wrk->trans_head = 0;
2090 0 : wrk->trans_tail = 0;
2091 0 : wrk->trans_size = DMA_TRANS_SIZE;
2092 : }
2093 0 : }
2094 :
2095 : void
2096 94 : session_node_enable_disable (u8 is_en)
2097 : {
2098 94 : u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2099 94 : u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2100 94 : session_main_t *sm = &session_main;
2101 : vlib_main_t *vm;
2102 : vlib_node_t *n;
2103 : int n_vlibs, i;
2104 :
2105 94 : n_vlibs = vlib_get_n_threads ();
2106 264 : for (i = 0; i < n_vlibs; i++)
2107 : {
2108 170 : vm = vlib_get_main_by_index (i);
2109 : /* main thread with workers and not polling */
2110 170 : if (i == 0 && n_vlibs > 1)
2111 : {
2112 58 : vlib_node_set_state (vm, session_queue_node.index, mstate);
2113 58 : if (is_en)
2114 : {
2115 21 : session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2116 21 : vlib_node_set_state (vm, session_queue_process_node.index,
2117 : state);
2118 21 : n = vlib_get_node (vm, session_queue_process_node.index);
2119 21 : vlib_start_process (vm, n->runtime_index);
2120 : }
2121 : else
2122 : {
2123 37 : vlib_process_signal_event_mt (vm,
2124 37 : session_queue_process_node.index,
2125 : SESSION_Q_PROCESS_STOP, 0);
2126 : }
2127 58 : if (!sm->poll_main)
2128 16 : continue;
2129 : }
2130 154 : vlib_node_set_state (vm, session_input_node.index, mstate);
2131 154 : vlib_node_set_state (vm, session_queue_node.index, state);
2132 : }
2133 :
2134 94 : if (sm->use_private_rx_mqs)
2135 2 : application_enable_rx_mqs_nodes (is_en);
2136 :
2137 94 : if (sm->dma_enabled)
2138 0 : session_node_enable_dma (is_en, n_vlibs);
2139 94 : }
2140 :
2141 : clib_error_t *
2142 117 : vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2143 : {
2144 117 : clib_error_t *error = 0;
2145 117 : if (is_en)
2146 : {
2147 109 : if (session_main.is_enabled)
2148 60 : return 0;
2149 :
2150 49 : error = session_manager_main_enable (vm);
2151 49 : session_node_enable_disable (is_en);
2152 : }
2153 : else
2154 : {
2155 8 : session_main.is_enabled = 0;
2156 8 : session_manager_main_disable (vm);
2157 8 : session_node_enable_disable (is_en);
2158 : }
2159 :
2160 57 : return error;
2161 : }
2162 :
2163 : clib_error_t *
2164 575 : session_main_init (vlib_main_t * vm)
2165 : {
2166 575 : session_main_t *smm = &session_main;
2167 :
2168 575 : smm->is_enabled = 0;
2169 575 : smm->session_enable_asap = 0;
2170 575 : smm->poll_main = 0;
2171 575 : smm->use_private_rx_mqs = 0;
2172 575 : smm->no_adaptive = 0;
2173 575 : smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2174 575 : smm->port_allocator_min_src_port = 1024;
2175 575 : smm->port_allocator_max_src_port = 65535;
2176 :
2177 575 : return 0;
2178 : }
2179 :
2180 : static clib_error_t *
2181 575 : session_main_loop_init (vlib_main_t * vm)
2182 : {
2183 575 : session_main_t *smm = &session_main;
2184 575 : if (smm->session_enable_asap)
2185 : {
2186 24 : vlib_worker_thread_barrier_sync (vm);
2187 24 : vnet_session_enable_disable (vm, 1 /* is_en */ );
2188 24 : vlib_worker_thread_barrier_release (vm);
2189 : }
2190 575 : return 0;
2191 : }
2192 :
2193 77759 : VLIB_INIT_FUNCTION (session_main_init);
2194 2303 : VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2195 :
2196 : static clib_error_t *
2197 575 : session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2198 : {
2199 575 : session_main_t *smm = &session_main;
2200 : u32 nitems;
2201 : uword tmp;
2202 :
2203 855 : while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2204 : {
2205 280 : if (unformat (input, "wrk-mq-length %d", &nitems))
2206 : {
2207 0 : if (nitems >= 2048)
2208 0 : smm->configured_wrk_mq_length = nitems;
2209 : else
2210 0 : clib_warning ("event queue length %d too small, ignored", nitems);
2211 : }
2212 280 : else if (unformat (input, "wrk-mqs-segment-size %U",
2213 : unformat_memory_size, &smm->wrk_mqs_segment_size))
2214 : ;
2215 259 : else if (unformat (input, "preallocated-sessions %d",
2216 : &smm->preallocated_sessions))
2217 : ;
2218 238 : else if (unformat (input, "v4-session-table-buckets %d",
2219 : &smm->configured_v4_session_table_buckets))
2220 : ;
2221 217 : else if (unformat (input, "v4-halfopen-table-buckets %d",
2222 : &smm->configured_v4_halfopen_table_buckets))
2223 : ;
2224 196 : else if (unformat (input, "v6-session-table-buckets %d",
2225 : &smm->configured_v6_session_table_buckets))
2226 : ;
2227 196 : else if (unformat (input, "v6-halfopen-table-buckets %d",
2228 : &smm->configured_v6_halfopen_table_buckets))
2229 : ;
2230 196 : else if (unformat (input, "v4-session-table-memory %U",
2231 : unformat_memory_size, &tmp))
2232 : {
2233 21 : if (tmp >= 0x100000000)
2234 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2235 : tmp, tmp);
2236 21 : smm->configured_v4_session_table_memory = tmp;
2237 : }
2238 175 : else if (unformat (input, "v4-halfopen-table-memory %U",
2239 : unformat_memory_size, &tmp))
2240 : {
2241 21 : if (tmp >= 0x100000000)
2242 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2243 : tmp, tmp);
2244 21 : smm->configured_v4_halfopen_table_memory = tmp;
2245 : }
2246 154 : else if (unformat (input, "v6-session-table-memory %U",
2247 : unformat_memory_size, &tmp))
2248 : {
2249 0 : if (tmp >= 0x100000000)
2250 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2251 : tmp, tmp);
2252 0 : smm->configured_v6_session_table_memory = tmp;
2253 : }
2254 154 : else if (unformat (input, "v6-halfopen-table-memory %U",
2255 : unformat_memory_size, &tmp))
2256 : {
2257 0 : if (tmp >= 0x100000000)
2258 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2259 : tmp, tmp);
2260 0 : smm->configured_v6_halfopen_table_memory = tmp;
2261 : }
2262 154 : else if (unformat (input, "local-endpoints-table-memory %U",
2263 : unformat_memory_size, &tmp))
2264 : {
2265 21 : if (tmp >= 0x100000000)
2266 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2267 : tmp, tmp);
2268 21 : smm->local_endpoints_table_memory = tmp;
2269 : }
2270 133 : else if (unformat (input, "local-endpoints-table-buckets %d",
2271 : &smm->local_endpoints_table_buckets))
2272 : ;
2273 112 : else if (unformat (input, "min-src-port %d", &tmp))
2274 0 : smm->port_allocator_min_src_port = tmp;
2275 112 : else if (unformat (input, "max-src-port %d", &tmp))
2276 0 : smm->port_allocator_max_src_port = tmp;
2277 112 : else if (unformat (input, "enable"))
2278 24 : smm->session_enable_asap = 1;
2279 88 : else if (unformat (input, "use-app-socket-api"))
2280 25 : (void) appns_sapi_enable_disable (1 /* is_enable */);
2281 63 : else if (unformat (input, "poll-main"))
2282 40 : smm->poll_main = 1;
2283 23 : else if (unformat (input, "use-private-rx-mqs"))
2284 2 : smm->use_private_rx_mqs = 1;
2285 21 : else if (unformat (input, "no-adaptive"))
2286 0 : smm->no_adaptive = 1;
2287 21 : else if (unformat (input, "use-dma"))
2288 0 : smm->dma_enabled = 1;
2289 21 : else if (unformat (input, "nat44-original-dst-enable"))
2290 : {
2291 0 : smm->original_dst_lookup = vlib_get_plugin_symbol (
2292 : "nat_plugin.so", "nat44_original_dst_lookup");
2293 : }
2294 : /*
2295 : * Deprecated but maintained for compatibility
2296 : */
2297 21 : else if (unformat (input, "evt_qs_memfd_seg"))
2298 : ;
2299 21 : else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2300 : ;
2301 21 : else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2302 : &smm->wrk_mqs_segment_size))
2303 : ;
2304 21 : else if (unformat (input, "event-queue-length %d", &nitems))
2305 : {
2306 21 : if (nitems >= 2048)
2307 21 : smm->configured_wrk_mq_length = nitems;
2308 : else
2309 0 : clib_warning ("event queue length %d too small, ignored", nitems);
2310 : }
2311 : else
2312 0 : return clib_error_return (0, "unknown input `%U'",
2313 : format_unformat_error, input);
2314 : }
2315 575 : return 0;
2316 : }
2317 :
2318 7514 : VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2319 :
2320 : /*
2321 : * fd.io coding-style-patch-verification: ON
2322 : *
2323 : * Local Variables:
2324 : * eval: (c-set-style "gnu")
2325 : * End:
2326 : */
|