Line data Source code
1 : /*
2 : * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 : /**
16 : * @file
17 : * @brief Session and session manager
18 : */
19 :
20 : #include <vnet/session/session.h>
21 : #include <vnet/session/application.h>
22 : #include <vnet/dpo/load_balance.h>
23 : #include <vnet/fib/ip4_fib.h>
24 : #include <vlib/stats/stats.h>
25 : #include <vlib/dma/dma.h>
26 :
27 : session_main_t session_main;
28 :
29 : static inline int
30 14070 : session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31 : session_evt_type_t evt_type)
32 : {
33 14070 : session_worker_t *wrk = session_main_get_worker (thread_index);
34 : session_event_t *evt;
35 : svm_msg_q_msg_t msg;
36 : svm_msg_q_t *mq;
37 :
38 14070 : mq = wrk->vpp_event_queue;
39 14070 : if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40 0 : return -1;
41 14070 : if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42 : {
43 0 : svm_msg_q_unlock (mq);
44 0 : return -2;
45 : }
46 14070 : switch (evt_type)
47 : {
48 148 : case SESSION_CTRL_EVT_RPC:
49 148 : msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50 148 : evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51 148 : evt->rpc_args.fp = data;
52 148 : evt->rpc_args.arg = args;
53 148 : break;
54 13922 : case SESSION_IO_EVT_RX:
55 : case SESSION_IO_EVT_TX:
56 : case SESSION_IO_EVT_TX_FLUSH:
57 : case SESSION_IO_EVT_BUILTIN_RX:
58 13922 : msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59 13922 : evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60 13922 : evt->session_index = *(u32 *) data;
61 13922 : break;
62 0 : case SESSION_IO_EVT_TX_MAIN:
63 : case SESSION_CTRL_EVT_CLOSE:
64 : case SESSION_CTRL_EVT_RESET:
65 0 : msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66 0 : evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67 0 : evt->session_handle = session_handle ((session_t *) data);
68 0 : break;
69 0 : default:
70 0 : clib_warning ("evt unhandled!");
71 0 : svm_msg_q_unlock (mq);
72 0 : return -1;
73 : }
74 14070 : evt->event_type = evt_type;
75 :
76 14070 : svm_msg_q_add_and_unlock (mq, &msg);
77 :
78 14070 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79 11 : vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80 :
81 14070 : return 0;
82 : }
83 :
84 : int
85 13913 : session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 : {
87 27826 : return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88 13913 : f->master_thread_index, evt_type);
89 : }
90 :
91 : int
92 9 : session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93 : session_evt_type_t evt_type)
94 : {
95 9 : return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 : }
97 :
98 : int
99 0 : session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 : {
101 : /* only events supported are disconnect, shutdown and reset */
102 0 : ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103 : evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104 : evt_type == SESSION_CTRL_EVT_RESET);
105 0 : return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 : }
107 :
108 : void
109 148 : session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110 : void *rpc_args)
111 : {
112 148 : session_send_evt_to_thread (fp, rpc_args, thread_index,
113 : SESSION_CTRL_EVT_RPC);
114 148 : }
115 :
116 : void
117 83 : session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 : {
119 83 : if (thread_index != vlib_get_thread_index ())
120 8 : session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121 : else
122 : {
123 75 : void (*fnp) (void *) = fp;
124 75 : fnp (rpc_args);
125 : }
126 83 : }
127 :
128 : void
129 41252 : session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 : {
131 41252 : session_t *s = session_get (tc->s_index, tc->thread_index);
132 :
133 41252 : ASSERT (s->thread_index == vlib_get_thread_index ());
134 41252 : ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135 :
136 41252 : if (!(s->flags & SESSION_F_CUSTOM_TX))
137 : {
138 41252 : s->flags |= SESSION_F_CUSTOM_TX;
139 41252 : if (svm_fifo_set_event (s->tx_fifo)
140 14972 : || transport_connection_is_descheduled (tc))
141 : {
142 : session_evt_elt_t *elt;
143 : session_worker_t *wrk;
144 :
145 27282 : wrk = session_main_get_worker (tc->thread_index);
146 27282 : if (has_prio)
147 27282 : elt = session_evt_alloc_new (wrk);
148 : else
149 0 : elt = session_evt_alloc_old (wrk);
150 27282 : elt->evt.session_index = tc->s_index;
151 27282 : elt->evt.event_type = SESSION_IO_EVT_TX;
152 27282 : tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153 :
154 27282 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155 24 : vlib_node_set_interrupt_pending (wrk->vm,
156 : session_queue_node.index);
157 : }
158 : }
159 41252 : }
160 :
161 : void
162 3402 : sesssion_reschedule_tx (transport_connection_t * tc)
163 : {
164 3402 : session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165 : session_evt_elt_t *elt;
166 :
167 3402 : ASSERT (tc->thread_index == vlib_get_thread_index ());
168 :
169 3402 : elt = session_evt_alloc_new (wrk);
170 3402 : elt->evt.session_index = tc->s_index;
171 3402 : elt->evt.event_type = SESSION_IO_EVT_TX;
172 :
173 3402 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174 16 : vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 3402 : }
176 :
177 : static void
178 471 : session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 : {
180 471 : u32 thread_index = vlib_get_thread_index ();
181 : session_evt_elt_t *elt;
182 : session_worker_t *wrk;
183 :
184 : /* If we are in the handler thread, or being called with the worker barrier
185 : * held, just append a new event to pending disconnects vector. */
186 471 : if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187 : {
188 471 : wrk = session_main_get_worker (s->thread_index);
189 471 : elt = session_evt_alloc_ctrl (wrk);
190 471 : clib_memset (&elt->evt, 0, sizeof (session_event_t));
191 471 : elt->evt.session_handle = session_handle (s);
192 471 : elt->evt.event_type = evt;
193 :
194 471 : if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195 5 : vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196 : }
197 : else
198 0 : session_send_ctrl_evt_to_thread (s, evt);
199 471 : }
200 :
201 : session_t *
202 640 : session_alloc (u32 thread_index)
203 : {
204 640 : session_worker_t *wrk = &session_main.wrk[thread_index];
205 : session_t *s;
206 :
207 640 : pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208 640 : clib_memset (s, 0, sizeof (*s));
209 640 : s->session_index = s - wrk->sessions;
210 640 : s->thread_index = thread_index;
211 640 : s->app_index = APP_INVALID_INDEX;
212 :
213 640 : return s;
214 : }
215 :
216 : void
217 488 : session_free (session_t * s)
218 : {
219 488 : session_worker_t *wrk = &session_main.wrk[s->thread_index];
220 :
221 : SESSION_EVT (SESSION_EVT_FREE, s);
222 : if (CLIB_DEBUG)
223 488 : clib_memset (s, 0xFA, sizeof (*s));
224 488 : pool_put (wrk->sessions, s);
225 488 : }
226 :
227 : u8
228 6238880 : session_is_valid (u32 si, u8 thread_index)
229 : {
230 : session_t *s;
231 : transport_connection_t *tc;
232 :
233 6238880 : s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
234 :
235 6238880 : if (s->thread_index != thread_index || s->session_index != si)
236 0 : return 0;
237 :
238 6238880 : if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
239 6238800 : || s->session_state <= SESSION_STATE_LISTENING)
240 1760 : return 1;
241 :
242 6237120 : if (s->session_state == SESSION_STATE_CONNECTING &&
243 351 : (s->flags & SESSION_F_HALF_OPEN))
244 173 : return 1;
245 :
246 6236950 : tc = session_get_transport (s);
247 6236950 : if (s->connection_index != tc->c_index
248 6236950 : || s->thread_index != tc->thread_index || tc->s_index != si)
249 0 : return 0;
250 :
251 6236950 : return 1;
252 : }
253 :
254 : static void
255 518 : session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
256 : {
257 : app_worker_t *app_wrk;
258 :
259 518 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
260 518 : if (!app_wrk)
261 130 : return;
262 388 : app_worker_cleanup_notify (app_wrk, s, ntf);
263 : }
264 :
265 : void
266 265 : session_free_w_fifos (session_t * s)
267 : {
268 265 : session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
269 265 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
270 265 : session_free (s);
271 265 : }
272 :
273 : /**
274 : * Cleans up session and lookup table.
275 : *
276 : * Transport connection must still be valid.
277 : */
278 : static void
279 174 : session_delete (session_t * s)
280 : {
281 : int rv;
282 :
283 : /* Delete from the main lookup table. */
284 174 : if ((rv = session_lookup_del_session (s)))
285 0 : clib_warning ("session %u hash delete rv %d", s->session_index, rv);
286 :
287 174 : session_free_w_fifos (s);
288 174 : }
289 :
290 : void
291 1 : session_cleanup_half_open (session_handle_t ho_handle)
292 : {
293 1 : session_t *ho = session_get_from_handle (ho_handle);
294 :
295 : /* App transports can migrate their half-opens */
296 1 : if (ho->flags & SESSION_F_IS_MIGRATING)
297 : {
298 : /* Session still migrating, move to closed state to signal that the
299 : * session should be removed. */
300 1 : if (ho->connection_index == ~0)
301 : {
302 0 : session_set_state (ho, SESSION_STATE_CLOSED);
303 0 : return;
304 : }
305 : /* Migrated transports are no longer half-opens */
306 1 : transport_cleanup (session_get_transport_proto (ho),
307 1 : ho->connection_index, ho->app_index /* overloaded */);
308 : }
309 : else
310 : {
311 : /* Cleanup half-open session lookup table if need be */
312 0 : if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
313 : {
314 : transport_connection_t *tc;
315 0 : tc = transport_get_half_open (session_get_transport_proto (ho),
316 : ho->connection_index);
317 0 : if (tc && !(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
318 0 : session_lookup_del_half_open (tc);
319 : }
320 0 : transport_cleanup_half_open (session_get_transport_proto (ho),
321 : ho->connection_index);
322 : }
323 1 : session_free (ho);
324 : }
325 :
326 : static void
327 147 : session_half_open_free (session_t *ho)
328 : {
329 : app_worker_t *app_wrk;
330 :
331 147 : ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
332 147 : app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
333 147 : if (app_wrk)
334 147 : app_worker_del_half_open (app_wrk, ho);
335 147 : session_free (ho);
336 147 : }
337 :
338 : static void
339 0 : session_half_open_free_rpc (void *args)
340 : {
341 0 : session_t *ho = ho_session_get (pointer_to_uword (args));
342 0 : session_half_open_free (ho);
343 0 : }
344 :
345 : void
346 147 : session_half_open_delete_notify (transport_connection_t *tc)
347 : {
348 147 : session_t *ho = ho_session_get (tc->s_index);
349 :
350 : /* Cleanup half-open lookup table if need be */
351 147 : if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
352 : {
353 15 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
354 0 : session_lookup_del_half_open (tc);
355 : }
356 :
357 : /* Notification from ctrl thread accepted without rpc */
358 147 : if (tc->thread_index == transport_cl_thread ())
359 : {
360 147 : session_half_open_free (ho);
361 : }
362 : else
363 : {
364 0 : void *args = uword_to_pointer ((uword) tc->s_index, void *);
365 0 : session_send_rpc_evt_to_thread_force (transport_cl_thread (),
366 : session_half_open_free_rpc, args);
367 : }
368 147 : }
369 :
370 : void
371 13 : session_half_open_migrate_notify (transport_connection_t *tc)
372 : {
373 : session_t *ho;
374 :
375 : /* Support half-open migrations only for transports with no lookup */
376 13 : ASSERT (tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP);
377 :
378 13 : ho = ho_session_get (tc->s_index);
379 13 : ho->flags |= SESSION_F_IS_MIGRATING;
380 13 : ho->connection_index = ~0;
381 13 : }
382 :
383 : int
384 13 : session_half_open_migrated_notify (transport_connection_t *tc)
385 : {
386 : session_t *ho;
387 :
388 13 : ho = ho_session_get (tc->s_index);
389 :
390 : /* App probably detached so the half-open must be cleaned up */
391 13 : if (ho->session_state == SESSION_STATE_CLOSED)
392 : {
393 0 : session_half_open_delete_notify (tc);
394 0 : return -1;
395 : }
396 13 : ho->connection_index = tc->c_index;
397 : /* Overload app index for half-open with new thread */
398 13 : ho->app_index = tc->thread_index;
399 13 : return 0;
400 : }
401 :
402 : session_t *
403 311 : session_alloc_for_connection (transport_connection_t * tc)
404 : {
405 : session_t *s;
406 311 : u32 thread_index = tc->thread_index;
407 :
408 311 : ASSERT (thread_index == vlib_get_thread_index ()
409 : || transport_protocol_is_cl (tc->proto));
410 :
411 311 : s = session_alloc (thread_index);
412 311 : s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
413 311 : session_set_state (s, SESSION_STATE_CLOSED);
414 :
415 : /* Attach transport to session and vice versa */
416 311 : s->connection_index = tc->c_index;
417 311 : tc->s_index = s->session_index;
418 311 : return s;
419 : }
420 :
421 : session_t *
422 148 : session_alloc_for_half_open (transport_connection_t *tc)
423 : {
424 : session_t *s;
425 :
426 148 : s = ho_session_alloc ();
427 148 : s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
428 148 : s->connection_index = tc->c_index;
429 148 : tc->s_index = s->session_index;
430 148 : return s;
431 : }
432 :
433 : /**
434 : * Discards bytes from buffer chain
435 : *
436 : * It discards n_bytes_to_drop starting at first buffer after chain_b
437 : */
438 : always_inline void
439 0 : session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
440 : vlib_buffer_t ** chain_b,
441 : u32 n_bytes_to_drop)
442 : {
443 0 : vlib_buffer_t *next = *chain_b;
444 0 : u32 to_drop = n_bytes_to_drop;
445 0 : ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
446 0 : while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
447 : {
448 0 : next = vlib_get_buffer (vm, next->next_buffer);
449 0 : if (next->current_length > to_drop)
450 : {
451 0 : vlib_buffer_advance (next, to_drop);
452 0 : to_drop = 0;
453 : }
454 : else
455 : {
456 0 : to_drop -= next->current_length;
457 0 : next->current_length = 0;
458 : }
459 : }
460 0 : *chain_b = next;
461 :
462 0 : if (to_drop == 0)
463 0 : b->total_length_not_including_first_buffer -= n_bytes_to_drop;
464 0 : }
465 :
466 : /**
467 : * Enqueue buffer chain tail
468 : */
469 : always_inline int
470 0 : session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
471 : u32 offset, u8 is_in_order)
472 : {
473 : vlib_buffer_t *chain_b;
474 : u32 chain_bi, len, diff;
475 0 : vlib_main_t *vm = vlib_get_main ();
476 : u8 *data;
477 0 : u32 written = 0;
478 0 : int rv = 0;
479 :
480 0 : if (is_in_order && offset)
481 : {
482 0 : diff = offset - b->current_length;
483 0 : if (diff > b->total_length_not_including_first_buffer)
484 0 : return 0;
485 0 : chain_b = b;
486 0 : session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
487 0 : chain_bi = vlib_get_buffer_index (vm, chain_b);
488 : }
489 : else
490 0 : chain_bi = b->next_buffer;
491 :
492 : do
493 : {
494 0 : chain_b = vlib_get_buffer (vm, chain_bi);
495 0 : data = vlib_buffer_get_current (chain_b);
496 0 : len = chain_b->current_length;
497 0 : if (!len)
498 0 : continue;
499 0 : if (is_in_order)
500 : {
501 0 : rv = svm_fifo_enqueue (s->rx_fifo, len, data);
502 0 : if (rv == len)
503 : {
504 0 : written += rv;
505 : }
506 0 : else if (rv < len)
507 : {
508 0 : return (rv > 0) ? (written + rv) : written;
509 : }
510 0 : else if (rv > len)
511 : {
512 0 : written += rv;
513 :
514 : /* written more than what was left in chain */
515 0 : if (written > b->total_length_not_including_first_buffer)
516 0 : return written;
517 :
518 : /* drop the bytes that have already been delivered */
519 0 : session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
520 : }
521 : }
522 : else
523 : {
524 0 : rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
525 0 : if (rv)
526 : {
527 0 : clib_warning ("failed to enqueue multi-buffer seg");
528 0 : return -1;
529 : }
530 0 : offset += len;
531 : }
532 : }
533 0 : while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
534 0 : ? chain_b->next_buffer : 0));
535 :
536 0 : if (is_in_order)
537 0 : return written;
538 :
539 0 : return 0;
540 : }
541 :
542 : void
543 1151430 : session_fifo_tuning (session_t * s, svm_fifo_t * f,
544 : session_ft_action_t act, u32 len)
545 : {
546 1151430 : if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
547 : {
548 0 : app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
549 0 : app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
550 : if (CLIB_ASSERT_ENABLE)
551 : {
552 : segment_manager_t *sm;
553 0 : sm = segment_manager_get (f->segment_manager);
554 0 : ASSERT (f->shr->size >= 4096);
555 0 : ASSERT (f->shr->size <= sm->max_fifo_size);
556 : }
557 : }
558 1151430 : }
559 :
560 : /*
561 : * Enqueue data for delivery to session peer. Does not notify peer of enqueue
562 : * event but on request can queue notification events for later delivery by
563 : * calling stream_server_flush_enqueue_events().
564 : *
565 : * @param tc Transport connection which is to be enqueued data
566 : * @param b Buffer to be enqueued
567 : * @param offset Offset at which to start enqueueing if out-of-order
568 : * @param queue_event Flag to indicate if peer is to be notified or if event
569 : * is to be queued. The former is useful when more data is
570 : * enqueued and only one event is to be generated.
571 : * @param is_in_order Flag to indicate if data is in order
572 : * @return Number of bytes enqueued or a negative value if enqueueing failed.
573 : */
574 : int
575 995946 : session_enqueue_stream_connection (transport_connection_t * tc,
576 : vlib_buffer_t * b, u32 offset,
577 : u8 queue_event, u8 is_in_order)
578 : {
579 : session_t *s;
580 995946 : int enqueued = 0, rv, in_order_off;
581 :
582 995946 : s = session_get (tc->s_index, tc->thread_index);
583 :
584 995946 : if (is_in_order)
585 : {
586 995946 : enqueued = svm_fifo_enqueue (s->rx_fifo,
587 995946 : b->current_length,
588 995946 : vlib_buffer_get_current (b));
589 995946 : if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
590 : && enqueued >= 0))
591 : {
592 0 : in_order_off = enqueued > b->current_length ? enqueued : 0;
593 0 : rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
594 0 : if (rv > 0)
595 0 : enqueued += rv;
596 : }
597 : }
598 : else
599 : {
600 0 : rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
601 0 : b->current_length,
602 0 : vlib_buffer_get_current (b));
603 0 : if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
604 0 : session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
605 : /* if something was enqueued, report even this as success for ooo
606 : * segment handling */
607 0 : return rv;
608 : }
609 :
610 995946 : if (queue_event)
611 : {
612 : /* Queue RX event on this fifo. Eventually these will need to be flushed
613 : * by calling stream_server_flush_enqueue_events () */
614 : session_worker_t *wrk;
615 :
616 995946 : wrk = session_main_get_worker (s->thread_index);
617 995946 : if (!(s->flags & SESSION_F_RX_EVT))
618 : {
619 41096 : s->flags |= SESSION_F_RX_EVT;
620 41096 : vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
621 : }
622 :
623 995946 : session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
624 : }
625 :
626 995946 : return enqueued;
627 : }
628 :
629 : int
630 58171 : session_enqueue_dgram_connection (session_t * s,
631 : session_dgram_hdr_t * hdr,
632 : vlib_buffer_t * b, u8 proto, u8 queue_event)
633 : {
634 : int rv;
635 :
636 58171 : ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
637 : >= b->current_length + sizeof (*hdr));
638 :
639 58171 : if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
640 : {
641 : /* *INDENT-OFF* */
642 116342 : svm_fifo_seg_t segs[2] = {
643 : { (u8 *) hdr, sizeof (*hdr) },
644 58171 : { vlib_buffer_get_current (b), b->current_length }
645 : };
646 : /* *INDENT-ON* */
647 :
648 58171 : rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
649 : 0 /* allow_partial */ );
650 : }
651 : else
652 : {
653 0 : vlib_main_t *vm = vlib_get_main ();
654 0 : svm_fifo_seg_t *segs = 0, *seg;
655 0 : vlib_buffer_t *it = b;
656 0 : u32 n_segs = 1;
657 :
658 0 : vec_add2 (segs, seg, 1);
659 0 : seg->data = (u8 *) hdr;
660 0 : seg->len = sizeof (*hdr);
661 0 : while (it)
662 : {
663 0 : vec_add2 (segs, seg, 1);
664 0 : seg->data = vlib_buffer_get_current (it);
665 0 : seg->len = it->current_length;
666 0 : n_segs++;
667 0 : if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
668 0 : break;
669 0 : it = vlib_get_buffer (vm, it->next_buffer);
670 : }
671 0 : rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
672 : 0 /* allow partial */ );
673 0 : vec_free (segs);
674 : }
675 :
676 58171 : if (queue_event && rv > 0)
677 : {
678 : /* Queue RX event on this fifo. Eventually these will need to be flushed
679 : * by calling stream_server_flush_enqueue_events () */
680 : session_worker_t *wrk;
681 :
682 58171 : wrk = session_main_get_worker (s->thread_index);
683 58171 : if (!(s->flags & SESSION_F_RX_EVT))
684 : {
685 15412 : s->flags |= SESSION_F_RX_EVT;
686 15412 : vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
687 : }
688 :
689 58171 : session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
690 : }
691 58171 : return rv > 0 ? rv : 0;
692 : }
693 :
694 : int
695 0 : session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
696 : u32 offset, u32 max_bytes)
697 : {
698 0 : session_t *s = session_get (tc->s_index, tc->thread_index);
699 0 : return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
700 : }
701 :
702 : u32
703 40805 : session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
704 : {
705 40805 : session_t *s = session_get (tc->s_index, tc->thread_index);
706 : u32 rv;
707 :
708 40805 : rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
709 40805 : session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
710 :
711 40805 : if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
712 21801 : session_dequeue_notify (s);
713 :
714 40805 : return rv;
715 : }
716 :
717 : static inline int
718 0 : session_notify_subscribers (u32 app_index, session_t * s,
719 : svm_fifo_t * f, session_evt_type_t evt_type)
720 : {
721 : app_worker_t *app_wrk;
722 : application_t *app;
723 : int i;
724 :
725 0 : app = application_get (app_index);
726 0 : if (!app)
727 0 : return -1;
728 :
729 0 : for (i = 0; i < f->shr->n_subscribers; i++)
730 : {
731 0 : app_wrk = application_get_worker (app, f->shr->subscribers[i]);
732 0 : if (!app_wrk)
733 0 : continue;
734 0 : if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
735 0 : return -1;
736 : }
737 :
738 0 : return 0;
739 : }
740 :
741 : /**
742 : * Notify session peer that new data has been enqueued.
743 : *
744 : * @param s Stream session for which the event is to be generated.
745 : * @param lock Flag to indicate if call should lock message queue.
746 : *
747 : * @return 0 on success or negative number if failed to send notification.
748 : */
749 : static inline int
750 181259 : session_enqueue_notify_inline (session_t * s)
751 : {
752 : app_worker_t *app_wrk;
753 : u32 session_index;
754 : u8 n_subscribers;
755 : u32 thread_index;
756 :
757 181259 : session_index = s->session_index;
758 181259 : thread_index = s->thread_index;
759 181259 : n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
760 :
761 181259 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
762 181259 : if (PREDICT_FALSE (!app_wrk))
763 : {
764 : SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
765 0 : return 0;
766 : }
767 :
768 : SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
769 :
770 181259 : s->flags &= ~SESSION_F_RX_EVT;
771 :
772 : /* Application didn't confirm accept yet */
773 181259 : if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
774 2 : return 0;
775 :
776 181257 : if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
777 : SESSION_IO_EVT_RX)))
778 0 : return -1;
779 :
780 181257 : if (PREDICT_FALSE (n_subscribers))
781 : {
782 0 : s = session_get (session_index, thread_index);
783 0 : return session_notify_subscribers (app_wrk->app_index, s,
784 : s->rx_fifo, SESSION_IO_EVT_RX);
785 : }
786 :
787 181257 : return 0;
788 : }
789 :
790 : int
791 124751 : session_enqueue_notify (session_t * s)
792 : {
793 124751 : return session_enqueue_notify_inline (s);
794 : }
795 :
796 : static void
797 0 : session_enqueue_notify_rpc (void *arg)
798 : {
799 0 : u32 session_index = pointer_to_uword (arg);
800 : session_t *s;
801 :
802 0 : s = session_get_if_valid (session_index, vlib_get_thread_index ());
803 0 : if (!s)
804 0 : return;
805 :
806 0 : session_enqueue_notify (s);
807 : }
808 :
809 : /**
810 : * Like session_enqueue_notify, but can be called from a thread that does not
811 : * own the session.
812 : */
813 : void
814 0 : session_enqueue_notify_thread (session_handle_t sh)
815 : {
816 0 : u32 thread_index = session_thread_from_handle (sh);
817 0 : u32 session_index = session_index_from_handle (sh);
818 :
819 : /*
820 : * Pass session index (u32) as opposed to handle (u64) in case pointers
821 : * are not 64-bit.
822 : */
823 0 : session_send_rpc_evt_to_thread (thread_index,
824 : session_enqueue_notify_rpc,
825 0 : uword_to_pointer (session_index, void *));
826 0 : }
827 :
828 : int
829 22403 : session_dequeue_notify (session_t * s)
830 : {
831 : app_worker_t *app_wrk;
832 :
833 22403 : svm_fifo_clear_deq_ntf (s->tx_fifo);
834 :
835 22403 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
836 22403 : if (PREDICT_FALSE (!app_wrk))
837 0 : return -1;
838 :
839 22403 : if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
840 : SESSION_IO_EVT_TX)))
841 0 : return -1;
842 :
843 22403 : if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
844 0 : return session_notify_subscribers (app_wrk->app_index, s,
845 : s->tx_fifo, SESSION_IO_EVT_TX);
846 :
847 22403 : return 0;
848 : }
849 :
850 : /**
851 : * Flushes queue of sessions that are to be notified of new data
852 : * enqueued events.
853 : *
854 : * @param thread_index Thread index for which the flush is to be performed.
855 : * @return 0 on success or a positive number indicating the number of
856 : * failures due to API queue being full.
857 : */
858 : int
859 40927 : session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
860 : {
861 40927 : session_worker_t *wrk = session_main_get_worker (thread_index);
862 : session_t *s;
863 40927 : int i, errors = 0;
864 : u32 *indices;
865 :
866 40927 : indices = wrk->session_to_enqueue[transport_proto];
867 :
868 97435 : for (i = 0; i < vec_len (indices); i++)
869 : {
870 56508 : s = session_get_if_valid (indices[i], thread_index);
871 56508 : if (PREDICT_FALSE (!s))
872 : {
873 0 : errors++;
874 0 : continue;
875 : }
876 :
877 56508 : session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
878 : 0 /* TODO/not needed */ );
879 :
880 56508 : if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
881 0 : errors++;
882 : }
883 :
884 40927 : vec_reset_length (indices);
885 40927 : wrk->session_to_enqueue[transport_proto] = indices;
886 :
887 40927 : return errors;
888 : }
889 :
890 : int
891 0 : session_main_flush_all_enqueue_events (u8 transport_proto)
892 : {
893 0 : vlib_thread_main_t *vtm = vlib_get_thread_main ();
894 0 : int i, errors = 0;
895 0 : for (i = 0; i < 1 + vtm->n_threads; i++)
896 0 : errors += session_main_flush_enqueue_events (transport_proto, i);
897 0 : return errors;
898 : }
899 :
900 : int
901 132 : session_stream_connect_notify (transport_connection_t * tc,
902 : session_error_t err)
903 : {
904 132 : u32 opaque = 0, new_ti, new_si;
905 : app_worker_t *app_wrk;
906 132 : session_t *s = 0, *ho;
907 :
908 : /*
909 : * Cleanup half-open table
910 : */
911 132 : session_lookup_del_half_open (tc);
912 :
913 132 : ho = ho_session_get (tc->s_index);
914 132 : session_set_state (ho, SESSION_STATE_TRANSPORT_CLOSED);
915 132 : opaque = ho->opaque;
916 132 : app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
917 132 : if (!app_wrk)
918 0 : return -1;
919 :
920 132 : if (err)
921 0 : return app_worker_connect_notify (app_wrk, s, err, opaque);
922 :
923 132 : s = session_alloc_for_connection (tc);
924 132 : session_set_state (s, SESSION_STATE_CONNECTING);
925 132 : s->app_wrk_index = app_wrk->wrk_index;
926 132 : new_si = s->session_index;
927 132 : new_ti = s->thread_index;
928 :
929 132 : if ((err = app_worker_init_connected (app_wrk, s)))
930 : {
931 0 : session_free (s);
932 0 : app_worker_connect_notify (app_wrk, 0, err, opaque);
933 0 : return -1;
934 : }
935 :
936 132 : s = session_get (new_si, new_ti);
937 132 : session_set_state (s, SESSION_STATE_READY);
938 132 : session_lookup_add_connection (tc, session_handle (s));
939 :
940 132 : if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
941 : {
942 0 : session_lookup_del_connection (tc);
943 : /* Avoid notifying app about rejected session cleanup */
944 0 : s = session_get (new_si, new_ti);
945 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
946 0 : session_free (s);
947 0 : return -1;
948 : }
949 :
950 132 : return 0;
951 : }
952 :
953 : typedef union session_switch_pool_reply_args_
954 : {
955 : struct
956 : {
957 : u32 session_index;
958 : u16 thread_index;
959 : u8 is_closed;
960 : };
961 : u64 as_u64;
962 : } session_switch_pool_reply_args_t;
963 :
964 : STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
965 : "switch pool reply args size");
966 :
967 : static void
968 0 : session_switch_pool_reply (void *arg)
969 : {
970 : session_switch_pool_reply_args_t rargs;
971 : session_t *s;
972 :
973 0 : rargs.as_u64 = pointer_to_uword (arg);
974 0 : s = session_get_if_valid (rargs.session_index, rargs.thread_index);
975 0 : if (!s)
976 0 : return;
977 :
978 : /* Session closed during migration. Clean everything up */
979 0 : if (rargs.is_closed)
980 : {
981 0 : transport_cleanup (session_get_transport_proto (s), s->connection_index,
982 0 : s->thread_index);
983 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
984 0 : session_free (s);
985 0 : return;
986 : }
987 :
988 : /* Notify app that it has data on the new session */
989 0 : session_enqueue_notify (s);
990 : }
991 :
992 : typedef struct _session_switch_pool_args
993 : {
994 : u32 session_index;
995 : u32 thread_index;
996 : u32 new_thread_index;
997 : u32 new_session_index;
998 : } session_switch_pool_args_t;
999 :
1000 : /**
1001 : * Notify old thread of the session pool switch
1002 : */
1003 : static void
1004 0 : session_switch_pool (void *cb_args)
1005 : {
1006 0 : session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
1007 : session_switch_pool_reply_args_t rargs;
1008 : session_handle_t new_sh;
1009 : segment_manager_t *sm;
1010 : app_worker_t *app_wrk;
1011 : session_t *s;
1012 :
1013 0 : ASSERT (args->thread_index == vlib_get_thread_index ());
1014 0 : s = session_get (args->session_index, args->thread_index);
1015 :
1016 : /* Check if session closed during migration */
1017 0 : rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
1018 :
1019 0 : transport_cleanup (session_get_transport_proto (s), s->connection_index,
1020 0 : s->thread_index);
1021 :
1022 0 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1023 0 : if (app_wrk)
1024 : {
1025 : /* Cleanup fifo segment slice state for fifos */
1026 0 : sm = app_worker_get_connect_segment_manager (app_wrk);
1027 0 : segment_manager_detach_fifo (sm, &s->rx_fifo);
1028 0 : segment_manager_detach_fifo (sm, &s->tx_fifo);
1029 :
1030 : /* Notify app, using old session, about the migration event */
1031 0 : if (!rargs.is_closed)
1032 : {
1033 0 : new_sh = session_make_handle (args->new_session_index,
1034 : args->new_thread_index);
1035 0 : app_worker_migrate_notify (app_wrk, s, new_sh);
1036 : }
1037 : }
1038 :
1039 : /* Trigger app read and fifo updates on the new thread */
1040 0 : rargs.session_index = args->new_session_index;
1041 0 : rargs.thread_index = args->new_thread_index;
1042 0 : session_send_rpc_evt_to_thread (args->new_thread_index,
1043 : session_switch_pool_reply,
1044 0 : uword_to_pointer (rargs.as_u64, void *));
1045 :
1046 0 : session_free (s);
1047 0 : clib_mem_free (cb_args);
1048 0 : }
1049 :
1050 : /**
1051 : * Move dgram session to the right thread
1052 : */
1053 : int
1054 0 : session_dgram_connect_notify (transport_connection_t * tc,
1055 : u32 old_thread_index, session_t ** new_session)
1056 : {
1057 : session_t *new_s;
1058 : session_switch_pool_args_t *rpc_args;
1059 : segment_manager_t *sm;
1060 : app_worker_t *app_wrk;
1061 :
1062 : /*
1063 : * Clone half-open session to the right thread.
1064 : */
1065 0 : new_s = session_clone_safe (tc->s_index, old_thread_index);
1066 0 : new_s->connection_index = tc->c_index;
1067 0 : session_set_state (new_s, SESSION_STATE_READY);
1068 0 : new_s->flags |= SESSION_F_IS_MIGRATING;
1069 :
1070 0 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1071 0 : session_lookup_add_connection (tc, session_handle (new_s));
1072 :
1073 0 : app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1074 0 : if (app_wrk)
1075 : {
1076 : /* New set of fifos attached to the same shared memory */
1077 0 : sm = app_worker_get_connect_segment_manager (app_wrk);
1078 0 : segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1079 0 : segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1080 : }
1081 :
1082 : /*
1083 : * Ask thread owning the old session to clean it up and make us the tx
1084 : * fifo owner
1085 : */
1086 0 : rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1087 0 : rpc_args->new_session_index = new_s->session_index;
1088 0 : rpc_args->new_thread_index = new_s->thread_index;
1089 0 : rpc_args->session_index = tc->s_index;
1090 0 : rpc_args->thread_index = old_thread_index;
1091 0 : session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1092 : rpc_args);
1093 :
1094 0 : tc->s_index = new_s->session_index;
1095 0 : new_s->connection_index = tc->c_index;
1096 0 : *new_session = new_s;
1097 0 : return 0;
1098 : }
1099 :
1100 : /**
1101 : * Notification from transport that connection is being closed.
1102 : *
1103 : * A disconnect is sent to application but state is not removed. Once
1104 : * disconnect is acknowledged by application, session disconnect is called.
1105 : * Ultimately this leads to close being called on transport (passive close).
1106 : */
1107 : void
1108 158 : session_transport_closing_notify (transport_connection_t * tc)
1109 : {
1110 : app_worker_t *app_wrk;
1111 : session_t *s;
1112 :
1113 158 : s = session_get (tc->s_index, tc->thread_index);
1114 158 : if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1115 6 : return;
1116 :
1117 : /* Wait for reply from app before sending notification as the
1118 : * accept might be rejected */
1119 152 : if (s->session_state == SESSION_STATE_ACCEPTING)
1120 : {
1121 0 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1122 0 : return;
1123 : }
1124 :
1125 152 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1126 152 : app_wrk = app_worker_get (s->app_wrk_index);
1127 152 : app_worker_close_notify (app_wrk, s);
1128 : }
1129 :
1130 : /**
1131 : * Notification from transport that connection is being deleted
1132 : *
1133 : * This removes the session if it is still valid. It should be called only on
1134 : * previously fully established sessions. For instance failed connects should
1135 : * call stream_session_connect_notify and indicate that the connect has
1136 : * failed.
1137 : */
1138 : void
1139 253 : session_transport_delete_notify (transport_connection_t * tc)
1140 : {
1141 : session_t *s;
1142 :
1143 : /* App might've been removed already */
1144 253 : if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1145 0 : return;
1146 :
1147 253 : switch (s->session_state)
1148 : {
1149 0 : case SESSION_STATE_CREATED:
1150 : /* Session was created but accept notification was not yet sent to the
1151 : * app. Cleanup everything. */
1152 0 : session_lookup_del_session (s);
1153 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1154 0 : session_free (s);
1155 0 : break;
1156 0 : case SESSION_STATE_ACCEPTING:
1157 : case SESSION_STATE_TRANSPORT_CLOSING:
1158 : case SESSION_STATE_CLOSING:
1159 : case SESSION_STATE_TRANSPORT_CLOSED:
1160 : /* If transport finishes or times out before we get a reply
1161 : * from the app, mark transport as closed and wait for reply
1162 : * before removing the session. Cleanup session table in advance
1163 : * because transport will soon be closed and closed sessions
1164 : * are assumed to have been removed from the lookup table */
1165 0 : session_lookup_del_session (s);
1166 0 : session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1167 0 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1168 0 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1169 0 : break;
1170 79 : case SESSION_STATE_APP_CLOSED:
1171 : /* Cleanup lookup table as transport needs to still be valid.
1172 : * Program transport close to ensure that all session events
1173 : * have been cleaned up. Once transport close is called, the
1174 : * session is just removed because both transport and app have
1175 : * confirmed the close*/
1176 79 : session_lookup_del_session (s);
1177 79 : session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1178 79 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1179 79 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1180 79 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1181 79 : break;
1182 0 : case SESSION_STATE_TRANSPORT_DELETED:
1183 0 : break;
1184 174 : case SESSION_STATE_CLOSED:
1185 174 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1186 174 : session_delete (s);
1187 174 : break;
1188 0 : default:
1189 0 : clib_warning ("session state %u", s->session_state);
1190 0 : session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1191 0 : session_delete (s);
1192 0 : break;
1193 : }
1194 : }
1195 :
1196 : /**
1197 : * Notification from transport that it is closed
1198 : *
1199 : * Should be called by transport, prior to calling delete notify, once it
1200 : * knows that no more data will be exchanged. This could serve as an
1201 : * early acknowledgment of an active close especially if transport delete
1202 : * can be delayed a long time, e.g., tcp time-wait.
1203 : */
1204 : void
1205 295 : session_transport_closed_notify (transport_connection_t * tc)
1206 : {
1207 : app_worker_t *app_wrk;
1208 : session_t *s;
1209 :
1210 295 : if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1211 0 : return;
1212 :
1213 : /* Transport thinks that app requested close but it actually didn't.
1214 : * Can happen for tcp:
1215 : * 1)if fin and rst are received in close succession.
1216 : * 2)if app shutdown the connection. */
1217 295 : if (s->session_state == SESSION_STATE_READY)
1218 : {
1219 0 : session_transport_closing_notify (tc);
1220 0 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1221 0 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1222 : }
1223 : /* If app close has not been received or has not yet resulted in
1224 : * a transport close, only mark the session transport as closed */
1225 295 : else if (s->session_state <= SESSION_STATE_CLOSING)
1226 4 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1227 : /* If app also closed, switch to closed */
1228 291 : else if (s->session_state == SESSION_STATE_APP_CLOSED)
1229 291 : session_set_state (s, SESSION_STATE_CLOSED);
1230 :
1231 295 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1232 295 : if (app_wrk)
1233 264 : app_worker_transport_closed_notify (app_wrk, s);
1234 : }
1235 :
1236 : /**
1237 : * Notify application that connection has been reset.
1238 : */
1239 : void
1240 5 : session_transport_reset_notify (transport_connection_t * tc)
1241 : {
1242 : app_worker_t *app_wrk;
1243 : session_t *s;
1244 :
1245 5 : s = session_get (tc->s_index, tc->thread_index);
1246 5 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1247 5 : if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1248 0 : return;
1249 5 : if (s->session_state == SESSION_STATE_ACCEPTING)
1250 : {
1251 0 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1252 0 : return;
1253 : }
1254 5 : session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1255 5 : app_wrk = app_worker_get (s->app_wrk_index);
1256 5 : app_worker_reset_notify (app_wrk, s);
1257 : }
1258 :
1259 : int
1260 132 : session_stream_accept_notify (transport_connection_t * tc)
1261 : {
1262 : app_worker_t *app_wrk;
1263 : session_t *s;
1264 :
1265 132 : s = session_get (tc->s_index, tc->thread_index);
1266 132 : app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1267 132 : if (!app_wrk)
1268 0 : return -1;
1269 132 : if (s->session_state != SESSION_STATE_CREATED)
1270 0 : return 0;
1271 132 : session_set_state (s, SESSION_STATE_ACCEPTING);
1272 132 : if (app_worker_accept_notify (app_wrk, s))
1273 : {
1274 : /* On transport delete, no notifications should be sent. Unless, the
1275 : * accept is retried and successful. */
1276 0 : session_set_state (s, SESSION_STATE_CREATED);
1277 0 : return -1;
1278 : }
1279 132 : return 0;
1280 : }
1281 :
1282 : /**
1283 : * Accept a stream session. Optionally ping the server by callback.
1284 : */
1285 : int
1286 135 : session_stream_accept (transport_connection_t * tc, u32 listener_index,
1287 : u32 thread_index, u8 notify)
1288 : {
1289 : session_t *s;
1290 : int rv;
1291 :
1292 135 : s = session_alloc_for_connection (tc);
1293 135 : s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1294 135 : session_set_state (s, SESSION_STATE_CREATED);
1295 :
1296 135 : if ((rv = app_worker_init_accepted (s)))
1297 : {
1298 0 : session_free (s);
1299 0 : return rv;
1300 : }
1301 :
1302 135 : session_lookup_add_connection (tc, session_handle (s));
1303 :
1304 : /* Shoulder-tap the server */
1305 135 : if (notify)
1306 : {
1307 0 : app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1308 0 : if ((rv = app_worker_accept_notify (app_wrk, s)))
1309 : {
1310 0 : session_lookup_del_session (s);
1311 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1312 0 : session_free (s);
1313 0 : return rv;
1314 : }
1315 : }
1316 :
1317 135 : return 0;
1318 : }
1319 :
1320 : int
1321 20 : session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1322 : u32 thread_index)
1323 : {
1324 : app_worker_t *app_wrk;
1325 : session_t *s;
1326 : int rv;
1327 :
1328 20 : s = session_alloc_for_connection (tc);
1329 20 : s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1330 :
1331 20 : if ((rv = app_worker_init_accepted (s)))
1332 : {
1333 0 : session_free (s);
1334 0 : return rv;
1335 : }
1336 :
1337 20 : session_lookup_add_connection (tc, session_handle (s));
1338 20 : session_set_state (s, SESSION_STATE_ACCEPTING);
1339 :
1340 20 : app_wrk = app_worker_get (s->app_wrk_index);
1341 20 : if ((rv = app_worker_accept_notify (app_wrk, s)))
1342 : {
1343 0 : session_lookup_del_session (s);
1344 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1345 0 : session_free (s);
1346 0 : return rv;
1347 : }
1348 :
1349 20 : return 0;
1350 : }
1351 :
1352 : int
1353 24 : session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1354 : {
1355 : transport_connection_t *tc;
1356 : transport_endpoint_cfg_t *tep;
1357 : app_worker_t *app_wrk;
1358 : session_handle_t sh;
1359 : session_t *s;
1360 : int rv;
1361 :
1362 24 : tep = session_endpoint_to_transport_cfg (rmt);
1363 24 : rv = transport_connect (rmt->transport_proto, tep);
1364 24 : if (rv < 0)
1365 : {
1366 : SESSION_DBG ("Transport failed to open connection.");
1367 0 : return rv;
1368 : }
1369 :
1370 24 : tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1371 :
1372 : /* For dgram type of service, allocate session and fifos now */
1373 24 : app_wrk = app_worker_get (rmt->app_wrk_index);
1374 24 : s = session_alloc_for_connection (tc);
1375 24 : s->app_wrk_index = app_wrk->wrk_index;
1376 24 : session_set_state (s, SESSION_STATE_OPENED);
1377 24 : if (app_worker_init_connected (app_wrk, s))
1378 : {
1379 0 : session_free (s);
1380 0 : return -1;
1381 : }
1382 :
1383 24 : sh = session_handle (s);
1384 24 : *rsh = sh;
1385 :
1386 24 : session_lookup_add_connection (tc, sh);
1387 24 : return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1388 : }
1389 :
1390 : int
1391 153 : session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1392 : {
1393 : transport_connection_t *tc;
1394 : transport_endpoint_cfg_t *tep;
1395 : app_worker_t *app_wrk;
1396 : session_t *ho;
1397 : int rv;
1398 :
1399 153 : tep = session_endpoint_to_transport_cfg (rmt);
1400 153 : rv = transport_connect (rmt->transport_proto, tep);
1401 153 : if (rv < 0)
1402 : {
1403 : SESSION_DBG ("Transport failed to open connection.");
1404 5 : return rv;
1405 : }
1406 :
1407 148 : tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1408 :
1409 148 : app_wrk = app_worker_get (rmt->app_wrk_index);
1410 :
1411 : /* If transport offers a vc service, only allocate established
1412 : * session once the connection has been established.
1413 : * In the meantime allocate half-open session for tracking purposes
1414 : * associate half-open connection to it and add session to app-worker
1415 : * half-open table. These are needed to allocate the established
1416 : * session on transport notification, and to cleanup the half-open
1417 : * session if the app detaches before connection establishment.
1418 : */
1419 148 : ho = session_alloc_for_half_open (tc);
1420 148 : ho->app_wrk_index = app_wrk->wrk_index;
1421 148 : ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1422 148 : ho->opaque = rmt->opaque;
1423 148 : *rsh = session_handle (ho);
1424 :
1425 148 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1426 132 : session_lookup_add_half_open (tc, tc->c_index);
1427 :
1428 148 : return 0;
1429 : }
1430 :
1431 : int
1432 36 : session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1433 : {
1434 36 : transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1435 :
1436 : /* Not supported for now */
1437 36 : *rsh = SESSION_INVALID_HANDLE;
1438 36 : return transport_connect (rmt->transport_proto, tep_cfg);
1439 : }
1440 :
1441 : typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1442 : session_handle_t *);
1443 :
1444 : /* *INDENT-OFF* */
1445 : static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1446 : session_open_vc,
1447 : session_open_cl,
1448 : session_open_app,
1449 : };
1450 : /* *INDENT-ON* */
1451 :
1452 : /**
1453 : * Ask transport to open connection to remote transport endpoint.
1454 : *
1455 : * Stores handle for matching request with reply since the call can be
1456 : * asynchronous. For instance, for TCP the 3-way handshake must complete
1457 : * before reply comes. Session is only created once connection is established.
1458 : *
1459 : * @param app_index Index of the application requesting the connect
1460 : * @param st Session type requested.
1461 : * @param tep Remote transport endpoint
1462 : * @param opaque Opaque data (typically, api_context) the application expects
1463 : * on open completion.
1464 : */
1465 : int
1466 213 : session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1467 : {
1468 : transport_service_type_t tst;
1469 213 : tst = transport_protocol_service_type (rmt->transport_proto);
1470 213 : return session_open_srv_fns[tst](rmt, rsh);
1471 : }
1472 :
1473 : /**
1474 : * Ask transport to listen on session endpoint.
1475 : *
1476 : * @param s Session for which listen will be called. Note that unlike
1477 : * established sessions, listen sessions are not associated to a
1478 : * thread.
1479 : * @param sep Local endpoint to be listened on.
1480 : */
1481 : int
1482 78 : session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1483 : {
1484 : transport_endpoint_cfg_t *tep;
1485 : int tc_index;
1486 : u32 s_index;
1487 :
1488 : /* Transport bind/listen */
1489 78 : tep = session_endpoint_to_transport_cfg (sep);
1490 78 : s_index = ls->session_index;
1491 78 : tc_index = transport_start_listen (session_get_transport_proto (ls),
1492 : s_index, tep);
1493 :
1494 78 : if (tc_index < 0)
1495 0 : return tc_index;
1496 :
1497 : /* Attach transport to session. Lookup tables are populated by the app
1498 : * worker because local tables (for ct sessions) are not backed by a fib */
1499 78 : ls = listen_session_get (s_index);
1500 78 : ls->connection_index = tc_index;
1501 78 : ls->opaque = sep->opaque;
1502 :
1503 78 : return 0;
1504 : }
1505 :
1506 : /**
1507 : * Ask transport to stop listening on local transport endpoint.
1508 : *
1509 : * @param s Session to stop listening on. It must be in state LISTENING.
1510 : */
1511 : int
1512 62 : session_stop_listen (session_t * s)
1513 : {
1514 62 : transport_proto_t tp = session_get_transport_proto (s);
1515 : transport_connection_t *tc;
1516 :
1517 62 : if (s->session_state != SESSION_STATE_LISTENING)
1518 0 : return SESSION_E_NOLISTEN;
1519 :
1520 62 : tc = transport_get_listener (tp, s->connection_index);
1521 :
1522 : /* If no transport, assume everything was cleaned up already */
1523 62 : if (!tc)
1524 0 : return SESSION_E_NONE;
1525 :
1526 62 : if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1527 59 : session_lookup_del_connection (tc);
1528 :
1529 62 : transport_stop_listen (tp, s->connection_index);
1530 62 : return 0;
1531 : }
1532 :
1533 : /**
1534 : * Initialize session half-closing procedure.
1535 : *
1536 : * Note that half-closing will not change the state of the session.
1537 : */
1538 : void
1539 0 : session_half_close (session_t *s)
1540 : {
1541 0 : if (!s)
1542 0 : return;
1543 :
1544 0 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1545 : }
1546 :
1547 : /**
1548 : * Initialize session closing procedure.
1549 : *
1550 : * Request is always sent to session node to ensure that all outstanding
1551 : * requests are served before transport is notified.
1552 : */
1553 : void
1554 554 : session_close (session_t * s)
1555 : {
1556 554 : if (!s || (s->flags & SESSION_F_APP_CLOSED))
1557 162 : return;
1558 :
1559 : /* Transports can close and delete their state independent of app closes
1560 : * and transport initiated state transitions can hide app closes. Instead
1561 : * of extending the state machine to support separate tracking of app and
1562 : * transport initiated closes, use a flag. */
1563 392 : s->flags |= SESSION_F_APP_CLOSED;
1564 :
1565 392 : if (s->session_state >= SESSION_STATE_CLOSING)
1566 : {
1567 : /* Session will only be removed once both app and transport
1568 : * acknowledge the close */
1569 4 : if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1570 0 : || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1571 4 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1572 4 : return;
1573 : }
1574 :
1575 : /* App closed so stop propagating dequeue notifications.
1576 : * App might disconnect session before connected, in this case,
1577 : * tx_fifo may not be setup yet, so clear only it's inited. */
1578 388 : if (s->tx_fifo)
1579 388 : svm_fifo_clear_deq_ntf (s->tx_fifo);
1580 388 : session_set_state (s, SESSION_STATE_CLOSING);
1581 388 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1582 : }
1583 :
1584 : /**
1585 : * Force a close without waiting for data to be flushed
1586 : */
1587 : void
1588 0 : session_reset (session_t * s)
1589 : {
1590 0 : if (s->session_state >= SESSION_STATE_CLOSING)
1591 0 : return;
1592 : /* Drop all outstanding tx data
1593 : * App might disconnect session before connected, in this case,
1594 : * tx_fifo may not be setup yet, so clear only it's inited. */
1595 0 : if (s->tx_fifo)
1596 0 : svm_fifo_dequeue_drop_all (s->tx_fifo);
1597 0 : session_set_state (s, SESSION_STATE_CLOSING);
1598 0 : session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1599 : }
1600 :
1601 : /**
1602 : * Notify transport the session can be half-disconnected.
1603 : *
1604 : * Must be called from the session's thread.
1605 : */
1606 : void
1607 0 : session_transport_half_close (session_t *s)
1608 : {
1609 : /* Only READY session can be half-closed */
1610 0 : if (s->session_state != SESSION_STATE_READY)
1611 : {
1612 0 : return;
1613 : }
1614 :
1615 0 : transport_half_close (session_get_transport_proto (s), s->connection_index,
1616 0 : s->thread_index);
1617 : }
1618 :
1619 : /**
1620 : * Notify transport the session can be disconnected. This should eventually
1621 : * result in a delete notification that allows us to cleanup session state.
1622 : * Called for both active/passive disconnects.
1623 : *
1624 : * Must be called from the session's thread.
1625 : */
1626 : void
1627 471 : session_transport_close (session_t * s)
1628 : {
1629 471 : if (s->session_state >= SESSION_STATE_APP_CLOSED)
1630 : {
1631 83 : if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1632 4 : session_set_state (s, SESSION_STATE_CLOSED);
1633 : /* If transport is already deleted, just free the session */
1634 79 : else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1635 79 : session_free_w_fifos (s);
1636 83 : return;
1637 : }
1638 :
1639 : /* If the tx queue wasn't drained, the transport can continue to try
1640 : * sending the outstanding data (in closed state it cannot). It MUST however
1641 : * at one point, either after sending everything or after a timeout, call
1642 : * delete notify. This will finally lead to the complete cleanup of the
1643 : * session.
1644 : */
1645 388 : session_set_state (s, SESSION_STATE_APP_CLOSED);
1646 :
1647 388 : transport_close (session_get_transport_proto (s), s->connection_index,
1648 388 : s->thread_index);
1649 : }
1650 :
1651 : /**
1652 : * Force transport close
1653 : */
1654 : void
1655 0 : session_transport_reset (session_t * s)
1656 : {
1657 0 : if (s->session_state >= SESSION_STATE_APP_CLOSED)
1658 : {
1659 0 : if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1660 0 : session_set_state (s, SESSION_STATE_CLOSED);
1661 0 : else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1662 0 : session_free_w_fifos (s);
1663 0 : return;
1664 : }
1665 :
1666 0 : session_set_state (s, SESSION_STATE_APP_CLOSED);
1667 0 : transport_reset (session_get_transport_proto (s), s->connection_index,
1668 0 : s->thread_index);
1669 : }
1670 :
1671 : /**
1672 : * Cleanup transport and session state.
1673 : *
1674 : * Notify transport of the cleanup and free the session. This should
1675 : * be called only if transport reported some error and is already
1676 : * closed.
1677 : */
1678 : void
1679 0 : session_transport_cleanup (session_t * s)
1680 : {
1681 : /* Delete from main lookup table before we axe the the transport */
1682 0 : session_lookup_del_session (s);
1683 0 : if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1684 0 : transport_cleanup (session_get_transport_proto (s), s->connection_index,
1685 0 : s->thread_index);
1686 : /* Since we called cleanup, no delete notification will come. So, make
1687 : * sure the session is properly freed. */
1688 0 : segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1689 0 : session_free (s);
1690 0 : }
1691 :
1692 : /**
1693 : * Allocate worker mqs in share-able segment
1694 : *
1695 : * That can only be a newly created memfd segment, that must be mapped
1696 : * by all apps/stack users unless private rx mqs are enabled.
1697 : */
1698 : void
1699 49 : session_vpp_wrk_mqs_alloc (session_main_t *smm)
1700 : {
1701 49 : u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1702 49 : fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1703 49 : svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1704 : uword mqs_seg_size;
1705 : int i;
1706 :
1707 49 : mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1708 :
1709 49 : svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1710 49 : { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1711 : };
1712 49 : cfg->consumer_pid = 0;
1713 49 : cfg->n_rings = 2;
1714 49 : cfg->q_nitems = mq_q_length;
1715 49 : cfg->ring_cfgs = rc;
1716 :
1717 : /*
1718 : * Compute mqs segment size based on rings config and leave space
1719 : * for passing extended configuration messages, i.e., data allocated
1720 : * outside of the rings. If provided with a config value, accept it
1721 : * if larger than minimum size.
1722 : */
1723 49 : mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1724 49 : mqs_seg_size = mqs_seg_size + (1 << 20);
1725 49 : mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1726 :
1727 49 : mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1728 49 : mqs_seg->ssvm.my_pid = getpid ();
1729 49 : mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1730 :
1731 49 : if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1732 : {
1733 0 : clib_warning ("failed to initialize queue segment");
1734 0 : return;
1735 : }
1736 :
1737 49 : fifo_segment_init (mqs_seg);
1738 :
1739 : /* Special fifo segment that's filled only with mqs */
1740 49 : mqs_seg->h->n_mqs = vec_len (smm->wrk);
1741 :
1742 119 : for (i = 0; i < vec_len (smm->wrk); i++)
1743 70 : smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1744 : }
1745 :
1746 : fifo_segment_t *
1747 260 : session_main_get_wrk_mqs_segment (void)
1748 : {
1749 260 : return &session_main.wrk_mqs_segment;
1750 : }
1751 :
1752 : u64
1753 113 : session_segment_handle (session_t * s)
1754 : {
1755 : svm_fifo_t *f;
1756 :
1757 113 : if (!s->rx_fifo)
1758 0 : return SESSION_INVALID_HANDLE;
1759 :
1760 113 : f = s->rx_fifo;
1761 113 : return segment_manager_make_segment_handle (f->segment_manager,
1762 : f->segment_index);
1763 : }
1764 :
1765 : /* *INDENT-OFF* */
1766 : static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1767 : session_tx_fifo_peek_and_snd,
1768 : session_tx_fifo_dequeue_and_snd,
1769 : session_tx_fifo_dequeue_internal,
1770 : session_tx_fifo_dequeue_and_snd
1771 : };
1772 : /* *INDENT-ON* */
1773 :
1774 : void
1775 6758 : session_register_transport (transport_proto_t transport_proto,
1776 : const transport_proto_vft_t * vft, u8 is_ip4,
1777 : u32 output_node)
1778 : {
1779 6758 : session_main_t *smm = &session_main;
1780 : session_type_t session_type;
1781 6758 : u32 next_index = ~0;
1782 :
1783 6758 : session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1784 :
1785 6758 : vec_validate (smm->session_type_to_next, session_type);
1786 6758 : vec_validate (smm->session_tx_fns, session_type);
1787 :
1788 6758 : if (output_node != ~0)
1789 2236 : next_index = vlib_node_add_next (vlib_get_main (),
1790 2236 : session_queue_node.index, output_node);
1791 :
1792 6758 : smm->session_type_to_next[session_type] = next_index;
1793 6758 : smm->session_tx_fns[session_type] =
1794 6758 : session_tx_fns[vft->transport_options.tx_type];
1795 6758 : }
1796 :
1797 : void
1798 81 : session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1799 : {
1800 81 : session_main_t *smm = &session_main;
1801 : session_update_time_fn *fi;
1802 81 : u32 fi_pos = ~0;
1803 81 : u8 found = 0;
1804 :
1805 105 : vec_foreach (fi, smm->update_time_fns)
1806 : {
1807 32 : if (*fi == fn)
1808 : {
1809 8 : fi_pos = fi - smm->update_time_fns;
1810 8 : found = 1;
1811 8 : break;
1812 : }
1813 : }
1814 :
1815 81 : if (is_add)
1816 : {
1817 73 : if (found)
1818 : {
1819 0 : clib_warning ("update time fn %p already registered", fn);
1820 0 : return;
1821 : }
1822 73 : vec_add1 (smm->update_time_fns, fn);
1823 : }
1824 : else
1825 : {
1826 8 : vec_del1 (smm->update_time_fns, fi_pos);
1827 : }
1828 : }
1829 :
1830 : transport_proto_t
1831 0 : session_add_transport_proto (void)
1832 : {
1833 0 : session_main_t *smm = &session_main;
1834 : session_worker_t *wrk;
1835 : u32 thread;
1836 :
1837 0 : smm->last_transport_proto_type += 1;
1838 :
1839 0 : for (thread = 0; thread < vec_len (smm->wrk); thread++)
1840 : {
1841 0 : wrk = session_main_get_worker (thread);
1842 0 : vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1843 : }
1844 :
1845 0 : return smm->last_transport_proto_type;
1846 : }
1847 :
1848 : transport_connection_t *
1849 6469730 : session_get_transport (session_t * s)
1850 : {
1851 6469730 : if (s->session_state != SESSION_STATE_LISTENING)
1852 6469550 : return transport_get_connection (session_get_transport_proto (s),
1853 6469550 : s->connection_index, s->thread_index);
1854 : else
1855 181 : return transport_get_listener (session_get_transport_proto (s),
1856 : s->connection_index);
1857 : }
1858 :
1859 : void
1860 172 : session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1861 : {
1862 172 : if (s->session_state != SESSION_STATE_LISTENING)
1863 120 : return transport_get_endpoint (session_get_transport_proto (s),
1864 120 : s->connection_index, s->thread_index, tep,
1865 : is_lcl);
1866 : else
1867 52 : return transport_get_listener_endpoint (session_get_transport_proto (s),
1868 : s->connection_index, tep, is_lcl);
1869 : }
1870 :
1871 : int
1872 5 : session_transport_attribute (session_t *s, u8 is_get,
1873 : transport_endpt_attr_t *attr)
1874 : {
1875 5 : if (s->session_state < SESSION_STATE_READY)
1876 0 : return -1;
1877 :
1878 5 : return transport_connection_attribute (session_get_transport_proto (s),
1879 5 : s->connection_index, s->thread_index,
1880 : is_get, attr);
1881 : }
1882 :
1883 : transport_connection_t *
1884 17 : listen_session_get_transport (session_t * s)
1885 : {
1886 17 : return transport_get_listener (session_get_transport_proto (s),
1887 : s->connection_index);
1888 : }
1889 :
1890 : void
1891 12 : session_queue_run_on_main_thread (vlib_main_t * vm)
1892 : {
1893 12 : ASSERT (vlib_get_thread_index () == 0);
1894 12 : vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1895 12 : }
1896 :
1897 : static void
1898 25 : session_stats_collector_fn (vlib_stats_collector_data_t *d)
1899 : {
1900 25 : u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1901 25 : session_main_t *smm = &session_main;
1902 : session_worker_t *wrk;
1903 : counter_t **counters;
1904 : counter_t *cb;
1905 :
1906 25 : n_workers = vec_len (smm->wrk);
1907 25 : vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1908 25 : counters = d->entry->data;
1909 25 : cb = counters[0];
1910 :
1911 71 : for (i = 0; i < vec_len (smm->wrk); i++)
1912 : {
1913 46 : wrk = session_main_get_worker (i);
1914 46 : n_wrk_sessions = pool_elts (wrk->sessions);
1915 46 : cb[i] = n_wrk_sessions;
1916 46 : n_sessions += n_wrk_sessions;
1917 : }
1918 :
1919 25 : vlib_stats_set_gauge (d->private_data, n_sessions);
1920 25 : }
1921 :
1922 : static void
1923 49 : session_stats_collector_init (void)
1924 : {
1925 49 : vlib_stats_collector_reg_t reg = {};
1926 :
1927 49 : reg.entry_index =
1928 49 : vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1929 49 : reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1930 49 : reg.collect_fn = session_stats_collector_fn;
1931 49 : vlib_stats_register_collector_fn (®);
1932 49 : vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1933 49 : }
1934 :
1935 : static clib_error_t *
1936 49 : session_manager_main_enable (vlib_main_t * vm)
1937 : {
1938 49 : session_main_t *smm = &session_main;
1939 49 : vlib_thread_main_t *vtm = vlib_get_thread_main ();
1940 : u32 num_threads, preallocated_sessions_per_worker;
1941 : session_worker_t *wrk;
1942 : int i;
1943 :
1944 : /* We only initialize once and do not de-initialized on disable */
1945 49 : if (smm->is_initialized)
1946 0 : goto done;
1947 :
1948 49 : num_threads = 1 /* main thread */ + vtm->n_threads;
1949 :
1950 49 : if (num_threads < 1)
1951 0 : return clib_error_return (0, "n_thread_stacks not set");
1952 :
1953 : /* Allocate cache line aligned worker contexts */
1954 49 : vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1955 49 : clib_spinlock_init (&session_main.pool_realloc_lock);
1956 :
1957 119 : for (i = 0; i < num_threads; i++)
1958 : {
1959 70 : wrk = &smm->wrk[i];
1960 70 : wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1961 70 : wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1962 70 : wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1963 70 : wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1964 70 : wrk->evts_pending_main =
1965 70 : clib_llist_make_head (wrk->event_elts, evt_list);
1966 70 : wrk->vm = vlib_get_main_by_index (i);
1967 70 : wrk->last_vlib_time = vlib_time_now (vm);
1968 70 : wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1969 70 : wrk->timerfd = -1;
1970 70 : vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1971 :
1972 70 : if (!smm->no_adaptive && smm->use_private_rx_mqs)
1973 2 : session_wrk_enable_adaptive_mode (wrk);
1974 : }
1975 :
1976 : /* Allocate vpp event queues segment and queue */
1977 49 : session_vpp_wrk_mqs_alloc (smm);
1978 :
1979 : /* Initialize segment manager properties */
1980 49 : segment_manager_main_init ();
1981 :
1982 : /* Preallocate sessions */
1983 49 : if (smm->preallocated_sessions)
1984 : {
1985 21 : if (num_threads == 1)
1986 : {
1987 0 : pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1988 : }
1989 : else
1990 : {
1991 : int j;
1992 21 : preallocated_sessions_per_worker =
1993 21 : (1.1 * (f64) smm->preallocated_sessions /
1994 21 : (f64) (num_threads - 1));
1995 :
1996 42 : for (j = 1; j < num_threads; j++)
1997 : {
1998 21 : pool_init_fixed (smm->wrk[j].sessions,
1999 : preallocated_sessions_per_worker);
2000 : }
2001 : }
2002 : }
2003 :
2004 49 : session_lookup_init ();
2005 49 : app_namespaces_init ();
2006 49 : transport_init ();
2007 49 : session_stats_collector_init ();
2008 49 : smm->is_initialized = 1;
2009 :
2010 49 : done:
2011 :
2012 49 : smm->is_enabled = 1;
2013 :
2014 : /* Enable transports */
2015 49 : transport_enable_disable (vm, 1);
2016 49 : session_debug_init ();
2017 :
2018 49 : return 0;
2019 : }
2020 :
2021 : static void
2022 8 : session_manager_main_disable (vlib_main_t * vm)
2023 : {
2024 8 : transport_enable_disable (vm, 0 /* is_en */ );
2025 8 : }
2026 :
2027 : /* in this new callback, cookie hint the index */
2028 : void
2029 0 : session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
2030 : {
2031 : session_worker_t *wrk;
2032 0 : wrk = session_main_get_worker (vm->thread_index);
2033 : session_dma_transfer *dma_transfer;
2034 :
2035 0 : dma_transfer = &wrk->dma_trans[wrk->trans_head];
2036 0 : vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2037 : vec_len (dma_transfer->pending_tx_buffers));
2038 0 : vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2039 : vec_len (dma_transfer->pending_tx_nexts));
2040 0 : vec_reset_length (dma_transfer->pending_tx_buffers);
2041 0 : vec_reset_length (dma_transfer->pending_tx_nexts);
2042 0 : wrk->trans_head++;
2043 0 : if (wrk->trans_head == wrk->trans_size)
2044 0 : wrk->trans_head = 0;
2045 0 : return;
2046 : }
2047 :
2048 : static void
2049 0 : session_prepare_dma_args (vlib_dma_config_t *args)
2050 : {
2051 0 : args->max_batches = 16;
2052 0 : args->max_transfers = DMA_TRANS_SIZE;
2053 0 : args->max_transfer_size = 65536;
2054 0 : args->features = 0;
2055 0 : args->sw_fallback = 1;
2056 0 : args->barrier_before_last = 1;
2057 0 : args->callback_fn = session_dma_completion_cb;
2058 0 : }
2059 :
2060 : static void
2061 0 : session_node_enable_dma (u8 is_en, int n_vlibs)
2062 : {
2063 : vlib_dma_config_t args;
2064 0 : session_prepare_dma_args (&args);
2065 : session_worker_t *wrk;
2066 : vlib_main_t *vm;
2067 :
2068 0 : int config_index = -1;
2069 :
2070 0 : if (is_en)
2071 : {
2072 0 : vm = vlib_get_main_by_index (0);
2073 0 : config_index = vlib_dma_config_add (vm, &args);
2074 : }
2075 : else
2076 : {
2077 0 : vm = vlib_get_main_by_index (0);
2078 0 : wrk = session_main_get_worker (0);
2079 0 : if (wrk->config_index >= 0)
2080 0 : vlib_dma_config_del (vm, wrk->config_index);
2081 : }
2082 : int i;
2083 0 : for (i = 0; i < n_vlibs; i++)
2084 : {
2085 0 : vm = vlib_get_main_by_index (i);
2086 0 : wrk = session_main_get_worker (vm->thread_index);
2087 0 : wrk->config_index = config_index;
2088 0 : if (is_en)
2089 : {
2090 0 : if (config_index >= 0)
2091 0 : wrk->dma_enabled = true;
2092 0 : wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2093 : sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2094 0 : bzero (wrk->dma_trans,
2095 : sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2096 : }
2097 : else
2098 : {
2099 0 : if (wrk->dma_trans)
2100 0 : clib_mem_free (wrk->dma_trans);
2101 : }
2102 0 : wrk->trans_head = 0;
2103 0 : wrk->trans_tail = 0;
2104 0 : wrk->trans_size = DMA_TRANS_SIZE;
2105 : }
2106 0 : }
2107 :
2108 : void
2109 93 : session_node_enable_disable (u8 is_en)
2110 : {
2111 93 : u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2112 93 : u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2113 93 : session_main_t *sm = &session_main;
2114 : vlib_main_t *vm;
2115 : vlib_node_t *n;
2116 : int n_vlibs, i;
2117 :
2118 93 : n_vlibs = vlib_get_n_threads ();
2119 261 : for (i = 0; i < n_vlibs; i++)
2120 : {
2121 168 : vm = vlib_get_main_by_index (i);
2122 : /* main thread with workers and not polling */
2123 168 : if (i == 0 && n_vlibs > 1)
2124 : {
2125 57 : vlib_node_set_state (vm, session_queue_node.index, mstate);
2126 57 : if (is_en)
2127 : {
2128 21 : session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2129 21 : vlib_node_set_state (vm, session_queue_process_node.index,
2130 : state);
2131 21 : n = vlib_get_node (vm, session_queue_process_node.index);
2132 21 : vlib_start_process (vm, n->runtime_index);
2133 : }
2134 : else
2135 : {
2136 36 : vlib_process_signal_event_mt (vm,
2137 36 : session_queue_process_node.index,
2138 : SESSION_Q_PROCESS_STOP, 0);
2139 : }
2140 57 : if (!sm->poll_main)
2141 15 : continue;
2142 : }
2143 153 : vlib_node_set_state (vm, session_queue_node.index, state);
2144 : }
2145 :
2146 93 : if (sm->use_private_rx_mqs)
2147 2 : application_enable_rx_mqs_nodes (is_en);
2148 :
2149 93 : if (sm->dma_enabled)
2150 0 : session_node_enable_dma (is_en, n_vlibs);
2151 93 : }
2152 :
2153 : clib_error_t *
2154 117 : vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2155 : {
2156 117 : clib_error_t *error = 0;
2157 117 : if (is_en)
2158 : {
2159 109 : if (session_main.is_enabled)
2160 60 : return 0;
2161 :
2162 49 : error = session_manager_main_enable (vm);
2163 49 : session_node_enable_disable (is_en);
2164 : }
2165 : else
2166 : {
2167 8 : session_main.is_enabled = 0;
2168 8 : session_manager_main_disable (vm);
2169 8 : session_node_enable_disable (is_en);
2170 : }
2171 :
2172 57 : return error;
2173 : }
2174 :
2175 : clib_error_t *
2176 559 : session_main_init (vlib_main_t * vm)
2177 : {
2178 559 : session_main_t *smm = &session_main;
2179 :
2180 559 : smm->is_enabled = 0;
2181 559 : smm->session_enable_asap = 0;
2182 559 : smm->poll_main = 0;
2183 559 : smm->use_private_rx_mqs = 0;
2184 559 : smm->no_adaptive = 0;
2185 559 : smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2186 :
2187 559 : return 0;
2188 : }
2189 :
2190 : static clib_error_t *
2191 559 : session_main_loop_init (vlib_main_t * vm)
2192 : {
2193 559 : session_main_t *smm = &session_main;
2194 559 : if (smm->session_enable_asap)
2195 : {
2196 24 : vlib_worker_thread_barrier_sync (vm);
2197 24 : vnet_session_enable_disable (vm, 1 /* is_en */ );
2198 24 : vlib_worker_thread_barrier_release (vm);
2199 : }
2200 559 : return 0;
2201 : }
2202 :
2203 76159 : VLIB_INIT_FUNCTION (session_main_init);
2204 2239 : VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2205 :
2206 : static clib_error_t *
2207 559 : session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2208 : {
2209 559 : session_main_t *smm = &session_main;
2210 : u32 nitems;
2211 : uword tmp;
2212 :
2213 839 : while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2214 : {
2215 280 : if (unformat (input, "wrk-mq-length %d", &nitems))
2216 : {
2217 0 : if (nitems >= 2048)
2218 0 : smm->configured_wrk_mq_length = nitems;
2219 : else
2220 0 : clib_warning ("event queue length %d too small, ignored", nitems);
2221 : }
2222 280 : else if (unformat (input, "wrk-mqs-segment-size %U",
2223 : unformat_memory_size, &smm->wrk_mqs_segment_size))
2224 : ;
2225 259 : else if (unformat (input, "preallocated-sessions %d",
2226 : &smm->preallocated_sessions))
2227 : ;
2228 238 : else if (unformat (input, "v4-session-table-buckets %d",
2229 : &smm->configured_v4_session_table_buckets))
2230 : ;
2231 217 : else if (unformat (input, "v4-halfopen-table-buckets %d",
2232 : &smm->configured_v4_halfopen_table_buckets))
2233 : ;
2234 196 : else if (unformat (input, "v6-session-table-buckets %d",
2235 : &smm->configured_v6_session_table_buckets))
2236 : ;
2237 196 : else if (unformat (input, "v6-halfopen-table-buckets %d",
2238 : &smm->configured_v6_halfopen_table_buckets))
2239 : ;
2240 196 : else if (unformat (input, "v4-session-table-memory %U",
2241 : unformat_memory_size, &tmp))
2242 : {
2243 21 : if (tmp >= 0x100000000)
2244 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2245 : tmp, tmp);
2246 21 : smm->configured_v4_session_table_memory = tmp;
2247 : }
2248 175 : else if (unformat (input, "v4-halfopen-table-memory %U",
2249 : unformat_memory_size, &tmp))
2250 : {
2251 21 : if (tmp >= 0x100000000)
2252 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2253 : tmp, tmp);
2254 21 : smm->configured_v4_halfopen_table_memory = tmp;
2255 : }
2256 154 : else if (unformat (input, "v6-session-table-memory %U",
2257 : unformat_memory_size, &tmp))
2258 : {
2259 0 : if (tmp >= 0x100000000)
2260 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2261 : tmp, tmp);
2262 0 : smm->configured_v6_session_table_memory = tmp;
2263 : }
2264 154 : else if (unformat (input, "v6-halfopen-table-memory %U",
2265 : unformat_memory_size, &tmp))
2266 : {
2267 0 : if (tmp >= 0x100000000)
2268 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2269 : tmp, tmp);
2270 0 : smm->configured_v6_halfopen_table_memory = tmp;
2271 : }
2272 154 : else if (unformat (input, "local-endpoints-table-memory %U",
2273 : unformat_memory_size, &tmp))
2274 : {
2275 21 : if (tmp >= 0x100000000)
2276 0 : return clib_error_return (0, "memory size %llx (%lld) too large",
2277 : tmp, tmp);
2278 21 : smm->local_endpoints_table_memory = tmp;
2279 : }
2280 133 : else if (unformat (input, "local-endpoints-table-buckets %d",
2281 : &smm->local_endpoints_table_buckets))
2282 : ;
2283 112 : else if (unformat (input, "enable"))
2284 24 : smm->session_enable_asap = 1;
2285 88 : else if (unformat (input, "use-app-socket-api"))
2286 25 : (void) appns_sapi_enable_disable (1 /* is_enable */);
2287 63 : else if (unformat (input, "poll-main"))
2288 40 : smm->poll_main = 1;
2289 23 : else if (unformat (input, "use-private-rx-mqs"))
2290 2 : smm->use_private_rx_mqs = 1;
2291 21 : else if (unformat (input, "no-adaptive"))
2292 0 : smm->no_adaptive = 1;
2293 21 : else if (unformat (input, "use-dma"))
2294 0 : smm->dma_enabled = 1;
2295 : /*
2296 : * Deprecated but maintained for compatibility
2297 : */
2298 21 : else if (unformat (input, "evt_qs_memfd_seg"))
2299 : ;
2300 21 : else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2301 : ;
2302 21 : else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2303 : &smm->wrk_mqs_segment_size))
2304 : ;
2305 21 : else if (unformat (input, "event-queue-length %d", &nitems))
2306 : {
2307 21 : if (nitems >= 2048)
2308 21 : smm->configured_wrk_mq_length = nitems;
2309 : else
2310 0 : clib_warning ("event queue length %d too small, ignored", nitems);
2311 : }
2312 : else
2313 0 : return clib_error_return (0, "unknown input `%U'",
2314 : format_unformat_error, input);
2315 : }
2316 559 : return 0;
2317 : }
2318 :
2319 7306 : VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2320 :
2321 : /*
2322 : * fd.io coding-style-patch-verification: ON
2323 : *
2324 : * Local Variables:
2325 : * eval: (c-set-style "gnu")
2326 : * End:
2327 : */
|