Line data Source code
1 : /*
2 : * Copyright (c) 2016 Cisco and/or its affiliates.
3 : * Licensed under the Apache License, Version 2.0 (the "License");
4 : * you may not use this file except in compliance with the License.
5 : * You may obtain a copy of the License at:
6 : *
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : *
9 : * Unless required by applicable law or agreed to in writing, software
10 : * distributed under the License is distributed on an "AS IS" BASIS,
11 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : * See the License for the specific language governing permissions and
13 : * limitations under the License.
14 : */
15 : #include <assert.h>
16 : #include <stdio.h>
17 : #include <stdlib.h>
18 : #include <stddef.h>
19 : #include <sys/types.h>
20 : #include <sys/socket.h>
21 : #include <sys/mman.h>
22 : #include <sys/stat.h>
23 : #include <netinet/in.h>
24 : #include <netdb.h>
25 : #include <signal.h>
26 : #include <stdbool.h>
27 : #include <vnet/vnet.h>
28 : #include <vlib/vlib.h>
29 : #include <vlib/unix/unix.h>
30 : #include <vlibapi/api.h>
31 : #include <vlibmemory/api.h>
32 :
33 : #include <vlibmemory/memclnt.api_enum.h>
34 : #include <vlibmemory/memclnt.api_types.h>
35 :
36 : #include "vppapiclient.h"
37 :
38 : bool timeout_cancelled;
39 : bool timeout_in_progress;
40 : bool rx_thread_done;
41 :
42 : /*
43 : * Asynchronous mode:
44 : * Client registers a callback. All messages are sent to the callback.
45 : * Synchronous mode:
46 : * Client calls blocking read().
47 : * Clients are expected to collate events on a queue.
48 : * vac_write() -> suspends RX thread
49 : * vac_read() -> resumes RX thread
50 : */
51 :
52 : typedef struct {
53 : u8 connected_to_vlib;
54 : pthread_t rx_thread_handle;
55 : pthread_t timeout_thread_handle;
56 : pthread_mutex_t queue_lock;
57 : pthread_cond_t suspend_cv;
58 : pthread_cond_t resume_cv;
59 : pthread_mutex_t timeout_lock;
60 : u8 timeout_loop;
61 : pthread_cond_t timeout_cv;
62 : pthread_cond_t timeout_cancel_cv;
63 : pthread_cond_t terminate_cv;
64 : } vac_main_t;
65 :
66 : vac_main_t vac_main;
67 : vac_callback_t vac_callback;
68 : u16 read_timeout = 0;
69 : bool rx_is_running = false;
70 : bool timeout_thread_cancelled = false;
71 :
72 : /* Only ever allocate one heap */
73 : bool mem_initialized = false;
74 :
75 : static void
76 0 : init (void)
77 : {
78 0 : vac_main_t *pm = &vac_main;
79 0 : clib_memset(pm, 0, sizeof(*pm));
80 0 : pthread_mutex_init(&pm->queue_lock, NULL);
81 0 : pthread_cond_init(&pm->suspend_cv, NULL);
82 0 : pthread_cond_init(&pm->resume_cv, NULL);
83 0 : pthread_mutex_init(&pm->timeout_lock, NULL);
84 0 : pm->timeout_loop = 1;
85 0 : pthread_cond_init(&pm->timeout_cv, NULL);
86 0 : pthread_cond_init(&pm->timeout_cancel_cv, NULL);
87 0 : pthread_cond_init(&pm->terminate_cv, NULL);
88 0 : }
89 :
90 : static void
91 0 : cleanup (void)
92 : {
93 0 : vac_main_t *pm = &vac_main;
94 0 : pthread_mutex_destroy(&pm->queue_lock);
95 0 : pthread_cond_destroy(&pm->suspend_cv);
96 0 : pthread_cond_destroy(&pm->resume_cv);
97 0 : pthread_mutex_destroy(&pm->timeout_lock);
98 0 : pthread_cond_destroy(&pm->timeout_cv);
99 0 : pthread_cond_destroy(&pm->timeout_cancel_cv);
100 0 : pthread_cond_destroy(&pm->terminate_cv);
101 0 : clib_memset(pm, 0, sizeof(*pm));
102 0 : }
103 :
104 : void
105 0 : vac_free (void * msg)
106 : {
107 0 : vl_msg_api_free (msg);
108 0 : }
109 :
110 : static void
111 0 : vac_api_handler (void *msg)
112 : {
113 0 : u16 id = ntohs(*((u16 *)msg));
114 0 : msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
115 0 : int l = ntohl(msgbuf->data_len);
116 0 : if (l == 0)
117 0 : clib_warning("Message ID %d has wrong length: %d\n", id, l);
118 :
119 : /* Call Python callback */
120 0 : ASSERT(vac_callback);
121 0 : (vac_callback)(msg, l);
122 0 : vac_free(msg);
123 0 : }
124 :
125 : static void *
126 0 : vac_rx_thread_fn (void *arg)
127 : {
128 : svm_queue_t *q;
129 : vl_api_memclnt_keepalive_t *mp;
130 : vl_api_memclnt_keepalive_reply_t *rmp;
131 0 : vac_main_t *pm = &vac_main;
132 0 : api_main_t *am = vlibapi_get_main();
133 : vl_shmem_hdr_t *shmem_hdr;
134 : uword msg;
135 :
136 0 : q = am->vl_input_queue;
137 :
138 : while (1)
139 0 : while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
140 : {
141 0 : VL_MSG_API_UNPOISON((void *)msg);
142 0 : u16 id = ntohs(*((u16 *)msg));
143 0 : switch (id) {
144 0 : case VL_API_RX_THREAD_EXIT:
145 0 : vl_msg_api_free((void *) msg);
146 : /* signal waiting threads that this thread is about to terminate */
147 0 : pthread_mutex_lock(&pm->queue_lock);
148 0 : rx_thread_done = true;
149 0 : pthread_cond_signal(&pm->terminate_cv);
150 0 : pthread_mutex_unlock(&pm->queue_lock);
151 0 : pthread_exit(0);
152 : return 0;
153 : break;
154 :
155 0 : case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
156 0 : vl_msg_api_free((void * )msg);
157 : /* Suspend thread and signal reader */
158 0 : pthread_mutex_lock(&pm->queue_lock);
159 0 : pthread_cond_signal(&pm->suspend_cv);
160 : /* Wait for the resume signal */
161 0 : pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
162 0 : pthread_mutex_unlock(&pm->queue_lock);
163 0 : break;
164 :
165 0 : case VL_API_MEMCLNT_READ_TIMEOUT:
166 0 : clib_warning("Received read timeout in async thread\n");
167 0 : vl_msg_api_free((void *) msg);
168 0 : break;
169 :
170 0 : case VL_API_MEMCLNT_KEEPALIVE:
171 0 : mp = (void *)msg;
172 0 : rmp = vl_msg_api_alloc (sizeof (*rmp));
173 0 : clib_memset (rmp, 0, sizeof (*rmp));
174 0 : rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
175 0 : rmp->context = mp->context;
176 0 : shmem_hdr = am->shmem_hdr;
177 0 : vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
178 0 : vl_msg_api_free((void *) msg);
179 0 : break;
180 :
181 0 : default:
182 0 : vac_api_handler((void *)msg);
183 : }
184 : }
185 : }
186 :
187 : static void *
188 0 : vac_timeout_thread_fn (void *arg)
189 : {
190 : vl_api_memclnt_read_timeout_t *ep;
191 0 : vac_main_t *pm = &vac_main;
192 0 : api_main_t *am = vlibapi_get_main();
193 : struct timespec ts;
194 : struct timeval tv;
195 : int rv;
196 :
197 0 : while (pm->timeout_loop)
198 : {
199 : /* Wait for poke */
200 0 : pthread_mutex_lock(&pm->timeout_lock);
201 0 : while (!timeout_in_progress)
202 0 : pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
203 :
204 : /* Starting timer */
205 0 : gettimeofday(&tv, NULL);
206 0 : ts.tv_sec = tv.tv_sec + read_timeout;
207 0 : ts.tv_nsec = 0;
208 :
209 0 : if (!timeout_cancelled) {
210 0 : rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
211 : &pm->timeout_lock, &ts);
212 0 : if (rv == ETIMEDOUT && !timeout_thread_cancelled) {
213 0 : ep = vl_msg_api_alloc (sizeof (*ep));
214 0 : ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
215 0 : vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
216 : }
217 : }
218 :
219 0 : pthread_mutex_unlock(&pm->timeout_lock);
220 : }
221 0 : pthread_exit(0);
222 : }
223 :
224 : void
225 0 : vac_rx_suspend (void)
226 : {
227 0 : api_main_t *am = vlibapi_get_main();
228 0 : vac_main_t *pm = &vac_main;
229 : vl_api_memclnt_rx_thread_suspend_t *ep;
230 :
231 0 : if (!pm->rx_thread_handle) return;
232 0 : pthread_mutex_lock(&pm->queue_lock);
233 0 : if (rx_is_running)
234 : {
235 0 : ep = vl_msg_api_alloc (sizeof (*ep));
236 0 : ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
237 0 : vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
238 : /* Wait for RX thread to tell us it has suspended */
239 0 : pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
240 0 : rx_is_running = false;
241 : }
242 0 : pthread_mutex_unlock(&pm->queue_lock);
243 : }
244 :
245 : void
246 0 : vac_rx_resume (void)
247 : {
248 0 : vac_main_t *pm = &vac_main;
249 0 : if (!pm->rx_thread_handle) return;
250 0 : pthread_mutex_lock(&pm->queue_lock);
251 0 : if (rx_is_running) goto unlock;
252 0 : pthread_cond_signal(&pm->resume_cv);
253 0 : rx_is_running = true;
254 0 : unlock:
255 0 : pthread_mutex_unlock(&pm->queue_lock);
256 : }
257 :
258 : static uword *
259 0 : vac_msg_table_get_hash (void)
260 : {
261 0 : api_main_t *am = vlibapi_get_main();
262 0 : return (am->msg_index_by_name_and_crc);
263 : }
264 :
265 : int
266 0 : vac_msg_table_size(void)
267 : {
268 0 : api_main_t *am = vlibapi_get_main();
269 0 : return hash_elts(am->msg_index_by_name_and_crc);
270 : }
271 :
272 : int
273 0 : vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
274 : int rx_qlen)
275 : {
276 0 : rx_thread_done = false;
277 0 : int rv = 0;
278 0 : vac_main_t *pm = &vac_main;
279 :
280 0 : assert (clib_mem_get_heap ());
281 0 : init();
282 0 : if (chroot_prefix != NULL)
283 0 : vl_set_memory_root_path (chroot_prefix);
284 :
285 0 : if ((rv = vl_client_api_map("/vpe-api"))) {
286 0 : clib_warning ("vl_client_api_map returned %d", rv);
287 0 : return rv;
288 : }
289 :
290 0 : if (vl_client_connect(name, 0, rx_qlen) < 0) {
291 0 : vl_client_api_unmap();
292 0 : return (-1);
293 : }
294 :
295 0 : if (cb) {
296 : /* Start the rx queue thread */
297 0 : rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
298 0 : if (rv) {
299 0 : clib_warning("pthread_create returned %d", rv);
300 0 : vl_client_api_unmap();
301 0 : return (-1);
302 : }
303 0 : vac_callback = cb;
304 0 : rx_is_running = true;
305 : }
306 :
307 : /* Start read timeout thread */
308 0 : rv = pthread_create(&pm->timeout_thread_handle, NULL,
309 : vac_timeout_thread_fn, 0);
310 0 : if (rv) {
311 0 : clib_warning("pthread_create returned %d", rv);
312 0 : vl_client_api_unmap();
313 0 : return (-1);
314 : }
315 :
316 0 : pm->connected_to_vlib = 1;
317 :
318 0 : return (0);
319 : }
320 : static void
321 0 : set_timeout (unsigned short timeout)
322 : {
323 0 : vac_main_t *pm = &vac_main;
324 0 : pthread_mutex_lock(&pm->timeout_lock);
325 0 : read_timeout = timeout;
326 0 : timeout_in_progress = true;
327 0 : timeout_cancelled = false;
328 0 : pthread_cond_signal(&pm->timeout_cv);
329 0 : pthread_mutex_unlock(&pm->timeout_lock);
330 0 : }
331 :
332 : static void
333 0 : unset_timeout (void)
334 : {
335 0 : vac_main_t *pm = &vac_main;
336 0 : pthread_mutex_lock(&pm->timeout_lock);
337 0 : timeout_in_progress = false;
338 0 : timeout_cancelled = true;
339 0 : pthread_cond_signal(&pm->timeout_cancel_cv);
340 0 : pthread_mutex_unlock(&pm->timeout_lock);
341 0 : }
342 :
343 : int
344 0 : vac_disconnect (void)
345 : {
346 0 : api_main_t *am = vlibapi_get_main();
347 0 : vac_main_t *pm = &vac_main;
348 : uword junk;
349 0 : int rv = 0;
350 :
351 0 : if (!pm->connected_to_vlib) return 0;
352 :
353 0 : if (pm->rx_thread_handle) {
354 : vl_api_rx_thread_exit_t *ep;
355 0 : ep = vl_msg_api_alloc (sizeof (*ep));
356 0 : ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
357 0 : vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
358 :
359 : /* wait (with timeout) until RX thread has finished */
360 : struct timespec ts;
361 : struct timeval tv;
362 0 : gettimeofday(&tv, NULL);
363 0 : ts.tv_sec = tv.tv_sec + 5;
364 0 : ts.tv_nsec = 0;
365 :
366 0 : pthread_mutex_lock(&pm->queue_lock);
367 0 : if (rx_thread_done == false)
368 0 : rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
369 0 : pthread_mutex_unlock(&pm->queue_lock);
370 :
371 : /* now join so we wait until thread has -really- finished */
372 0 : if (rv == ETIMEDOUT)
373 0 : pthread_cancel(pm->rx_thread_handle);
374 : else
375 0 : pthread_join(pm->rx_thread_handle, (void **) &junk);
376 : }
377 0 : if (pm->timeout_thread_handle) {
378 : /* cancel, wake then join the timeout thread */
379 0 : pm->timeout_loop = 0;
380 0 : timeout_thread_cancelled = true;
381 0 : set_timeout(0);
382 0 : pthread_join(pm->timeout_thread_handle, (void **) &junk);
383 : }
384 :
385 0 : vl_client_disconnect();
386 0 : vl_client_api_unmap();
387 : //vac_callback = 0;
388 :
389 0 : cleanup();
390 :
391 0 : return (0);
392 : }
393 :
394 : int
395 0 : vac_read (char **p, int *l, u16 timeout)
396 : {
397 : svm_queue_t *q;
398 0 : api_main_t *am = vlibapi_get_main();
399 0 : vac_main_t *pm = &vac_main;
400 : vl_api_memclnt_keepalive_t *mp;
401 : vl_api_memclnt_keepalive_reply_t *rmp;
402 : uword msg;
403 : msgbuf_t *msgbuf;
404 : int rv;
405 : vl_shmem_hdr_t *shmem_hdr;
406 :
407 : /* svm_queue_sub(below) returns {-1, -2} */
408 0 : if (!pm->connected_to_vlib)
409 0 : return VAC_NOT_CONNECTED;
410 :
411 0 : *l = 0;
412 :
413 : /* svm_queue_sub(below) returns {-1, -2} */
414 0 : if (am->our_pid == 0)
415 0 : return (VAC_SHM_NOT_READY);
416 :
417 : /* Poke timeout thread */
418 0 : if (timeout)
419 0 : set_timeout(timeout);
420 :
421 0 : q = am->vl_input_queue;
422 :
423 0 : again:
424 0 : rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
425 :
426 0 : if (rv == 0) {
427 0 : VL_MSG_API_UNPOISON((void *)msg);
428 0 : u16 msg_id = ntohs(*((u16 *)msg));
429 0 : switch (msg_id) {
430 0 : case VL_API_RX_THREAD_EXIT:
431 0 : vl_msg_api_free((void *) msg);
432 0 : goto error;
433 0 : case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
434 0 : goto error;
435 0 : case VL_API_MEMCLNT_READ_TIMEOUT:
436 0 : goto error;
437 0 : case VL_API_MEMCLNT_KEEPALIVE:
438 : /* Handle an alive-check ping from vpp. */
439 0 : mp = (void *)msg;
440 0 : rmp = vl_msg_api_alloc (sizeof (*rmp));
441 0 : clib_memset (rmp, 0, sizeof (*rmp));
442 0 : rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
443 0 : rmp->context = mp->context;
444 0 : shmem_hdr = am->shmem_hdr;
445 0 : vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
446 0 : vl_msg_api_free((void *) msg);
447 : /*
448 : * Python code is blissfully unaware of these pings, so
449 : * act as if it never happened...
450 : */
451 0 : goto again;
452 :
453 0 : default:
454 0 : msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
455 0 : *l = ntohl(msgbuf->data_len);
456 0 : if (*l == 0) {
457 0 : fprintf(stderr, "Unregistered API message: %d\n", msg_id);
458 0 : goto error;
459 : }
460 : }
461 0 : *p = (char *)msg;
462 :
463 :
464 : } else {
465 0 : fprintf(stderr, "Read failed with %d\n", rv);
466 : }
467 : /* Let timeout notification thread know we're done */
468 0 : if (timeout)
469 0 : unset_timeout();
470 :
471 0 : return (rv);
472 :
473 0 : error:
474 0 : if (timeout)
475 0 : unset_timeout();
476 0 : vl_msg_api_free((void *) msg);
477 : /* Client might forget to resume RX thread on failure */
478 0 : vac_rx_resume ();
479 0 : return VAC_TIMEOUT;
480 : }
481 :
482 : /*
483 : * XXX: Makes the assumption that client_index is the first member
484 : */
485 : typedef struct _vl_api_header
486 : {
487 : u16 _vl_msg_id;
488 : u32 client_index;
489 : } __attribute__ ((packed)) vl_api_header_t;
490 :
491 : static u32
492 0 : vac_client_index (void)
493 : {
494 0 : return (vlibapi_get_main()->my_client_index);
495 : }
496 :
497 : int
498 0 : vac_write (char *p, int l)
499 : {
500 0 : int rv = -1;
501 0 : api_main_t *am = vlibapi_get_main();
502 0 : vl_api_header_t *mp = vl_msg_api_alloc(l);
503 : svm_queue_t *q;
504 0 : vac_main_t *pm = &vac_main;
505 :
506 0 : if (!pm->connected_to_vlib)
507 0 : return VAC_NOT_CONNECTED;
508 0 : if (!mp) return (-1);
509 :
510 0 : memcpy(mp, p, l);
511 0 : mp->client_index = vac_client_index();
512 0 : q = am->shmem_hdr->vl_input_queue;
513 0 : rv = svm_queue_add(q, (u8 *)&mp, 0);
514 0 : if (rv != 0) {
515 0 : fprintf(stderr, "vpe_api_write fails: %d\n", rv);
516 : /* Clear message */
517 0 : vac_free(mp);
518 : }
519 0 : return (rv);
520 : }
521 :
522 : int
523 0 : vac_get_msg_index (char * name)
524 : {
525 0 : return vl_msg_api_get_msg_index ((u8 *)name);
526 : }
527 :
528 : int
529 0 : vac_msg_table_max_index(void)
530 : {
531 0 : int max = 0;
532 : hash_pair_t *hp;
533 0 : uword *h = vac_msg_table_get_hash();
534 0 : hash_foreach_pair (hp, h,
535 : ({
536 : if (hp->value[0] > max)
537 : max = hp->value[0];
538 : }));
539 :
540 0 : return max;
541 : }
542 :
543 : void
544 0 : vac_set_error_handler (vac_error_callback_t cb)
545 : {
546 0 : assert (clib_mem_get_heap ());
547 0 : if (cb) clib_error_register_handler (cb, 0);
548 0 : }
549 :
550 : /*
551 : * Required if application doesn't use a VPP heap.
552 : */
553 : void
554 0 : vac_mem_init (size_t size)
555 : {
556 0 : if (mem_initialized)
557 0 : return;
558 0 : if (size == 0)
559 0 : clib_mem_init (0, 1 << 30); // default
560 : else
561 0 : clib_mem_init (0, size);
562 0 : mem_initialized = true;
563 : }
|