From 7557c635d09eeca0ccf722d35966b81e70ef1b51 Mon Sep 17 00:00:00 2001 From: Francois Dumontet Date: Mon, 30 Sep 2024 18:05:08 +0200 Subject: [PATCH] lib: Add threading and synchronization mechanisms for traps to AgentX - Introduce a dedicated AgentX thread using `frr_pthread`. - Add mutex locks (`ax_mtx`, `ax_io_mtx`) to manage thread synchronization for trap transfer and I/O operations. - Implemented ring buffers (`ibuf_ax`) for handling "master -> AgentX" communication, improving data handling between threads. - Update the SNMP read operations to use mutex locks to ensure thread-safe execution. - Integrated a new dedicated thread to send SNMP traps, ensuring separation of responsibilities between the main and AgentX threads. - Enhanced trap handling to support multi-index traps, with excess traps being discarded if the buffer is full, preventing overflow. - Enhanced trap handling to support multi-index traps. When more than "RINGBUF_NB_TRAP" traps are pending for transmission, subsequent traps are discarded to prevent overflow. This update significantly improves concurrency, synchronization, and trap management within the AgentX module, with added protection against socket's buffer overflow from excessive traps. The socket's buffer overflow is leading to process deadlock. Signed-off-by: Francois Dumontet --- lib/agentx.c | 116 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 103 insertions(+), 13 deletions(-) diff --git a/lib/agentx.c b/lib/agentx.c index ff6da0c9cdff..06e08e8faa6d 100644 --- a/lib/agentx.c +++ b/lib/agentx.c @@ -23,17 +23,53 @@ #include "libfrr.h" #include "xref.h" #include "lib/libagentx.h" +#include "ringbuf.h" /* for ringbuf_remain, ringbuf_peek, ringbuf_.. */ +#include "frr_pthread.h" /* for struct frr_pthread */ XREF_SETUP(); + +extern int agentx_stop(struct frr_pthread *fpt, void **result); + DEFINE_HOOK(agentx_enabled, (), ()); //bool agentx_enabled = false; static struct event_loop *agentx_tm; +static struct event_loop *main_tm; static struct event *timeout_thr = NULL; static struct list *events = NULL; +struct frr_pthread *agentx_pth; + +/* buffer dedicated to "master -> agentx" threads */ +static struct ringbuf *ibuf_ax; +#define RINGBUF_NB_TRAP 200 +/* mutex dedicated to trap transfert between threads */ +pthread_mutex_t ax_mtx; +/* mutex dedicated to send/read exclusion */ +pthread_mutex_t ax_io_mtx; + +static void agentx_pthreads_init(void) +{ + assert(!agentx_pth); + + struct frr_pthread_attr ax = { + .start = frr_pthread_attr_default.start, + .stop = frr_pthread_attr_default.stop, + }; + + agentx_pth = frr_pthread_new(&ax, "Agentx thread", "agentx_pth"); +} + +static void agentx_pthreads_run(void) +{ + frr_pthread_run(agentx_pth, NULL); + + /* Wait until threads are ready. */ + frr_pthread_wait_running(agentx_pth); +} + static void agentx_events_update(struct event *t); static void agentx_timeout(struct event *t) @@ -70,10 +106,11 @@ static void agentx_read(struct event *t) if (new_flags == -1) flog_err(EC_LIB_SYSTEM_CALL, "Failed to set snmp fd non blocking: %s(%d)", strerror(errno), errno); - - netsnmp_large_fd_set_init(&lfds, FD_SETSIZE); - netsnmp_large_fd_setfd(t->u.fd, &lfds); - snmp_read2(&lfds); + frr_with_mutex (&ax_io_mtx) { + netsnmp_large_fd_set_init(&lfds, FD_SETSIZE); + netsnmp_large_fd_setfd(t->u.fd, &lfds); + snmp_read2(&lfds); + } /* Reset the flag */ if (!nonblock) { @@ -107,8 +144,7 @@ static void agentx_events_update(struct event *t) snmp_select_info2(&maxfd, &lfds, &timeout, &block); if (!block) { - event_add_timer_tv(agentx_tm, agentx_timeout, NULL, &timeout, - &timeout_thr); + event_add_timer_tv(main_tm, agentx_timeout, NULL, &timeout, &timeout_thr); } ln = listhead(events); @@ -137,7 +173,7 @@ static void agentx_events_update(struct event *t) thr = XCALLOC(MTYPE_TMP, sizeof(struct event *)); newln = listnode_add_before(events, ln, thr); - event_add_read(agentx_tm, agentx_read, newln, fd, thr); + event_add_read(main_tm, agentx_read, newln, fd, thr); } } @@ -196,9 +232,12 @@ static int agentx_log_callback(int major, int minor, void *serverarg, static int agentx_cli_on(void) { if (!agentx_enabled) { + agentx_pthreads_run(); + agentx_tm = agentx_pth->master; init_snmp(FRR_SMUX_NAME); events = list_new(); agentx_events_update(NULL); + ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *)); agentx_enabled = true; hook_call(agentx_enabled); } @@ -215,8 +254,9 @@ static int agentx_cli_off(void) static int smux_disable(void) { - agentx_enabled = false; + agentx_enabled = false; + agentx_stop (agentx_pth, NULL); return 0; } @@ -227,11 +267,15 @@ bool smux_enabled(void) void smux_init(struct event_loop *tm) { - agentx_tm = tm; + main_tm = tm; + agentx_pthreads_init(); hook_register(agentx_cli_enabled, agentx_cli_on); hook_register(agentx_cli_disabled, agentx_cli_off); + pthread_mutex_init(&ax_mtx, NULL); + pthread_mutex_init(&ax_io_mtx, NULL); + netsnmp_enable_subagent(); snmp_disable_log(); snmp_enable_calllog(); @@ -245,9 +289,12 @@ void smux_init(struct event_loop *tm) void smux_agentx_enable(void) { if (!agentx_enabled) { + agentx_pthreads_run(); + agentx_tm = agentx_pth->master; init_snmp(FRR_SMUX_NAME); events = list_new(); agentx_events_update(NULL); + ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *)); agentx_enabled = true; } } @@ -275,6 +322,27 @@ void smux_trap(struct variable *vp, size_t vp_len, const oid *ename, trapobjlen, sptrap); } +static void smux_trap_multi_index_thd(struct event *thread) +{ + netsnmp_variable_list **notification_vars = NULL; + + frr_with_mutex (&ax_mtx) { + if (ringbuf_remain(ibuf_ax) == 0) { + zlog_err("%s no data to read in ring buffers", __func__); + return; + } + ringbuf_get(ibuf_ax, ¬ification_vars, sizeof(notification_vars)); + } + + frr_with_mutex (&ax_io_mtx) { + send_v2trap(*notification_vars); + } + snmp_free_varbind(*notification_vars); + SNMP_FREE(notification_vars); + /* continue in main thread */ + event_add_event(main_tm, agentx_events_update, NULL, 0, NULL); +} + int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename, size_t enamelen, const oid *name, size_t namelen, struct index_oid *iname, size_t index_len, @@ -286,8 +354,9 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename, oid notification_oid[MAX_OID_LEN]; size_t notification_oid_len; unsigned int i; + netsnmp_variable_list *notification_vars; - netsnmp_variable_list *notification_vars = NULL; + notification_vars = SNMP_MALLOC_TYPEDEF(netsnmp_variable_list); if (!agentx_enabled) return 0; @@ -365,10 +434,18 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename, } } + /* transmission to the agentx thread*/ + frr_with_mutex (&ax_mtx) { + if (ringbuf_space(ibuf_ax) < sizeof(notification_vars)) { + zlog_err("%s not enougth space in ibuf_ax needed : %lu free : %lu", + __func__, sizeof(notification_vars), ringbuf_space(ibuf_ax)); + return 0; + } + ringbuf_put(ibuf_ax, ¬ification_vars, sizeof(netsnmp_variable_list **)); + } + + event_add_event(agentx_tm, smux_trap_multi_index_thd, NULL, 0, NULL); - send_v2trap(notification_vars); - snmp_free_varbind(notification_vars); - agentx_events_update(NULL); return 1; } @@ -389,4 +466,17 @@ void smux_terminate(void) list_delete(&events); } } + +int agentx_stop(struct frr_pthread *fpt, void **result) +{ + assert(fpt->running); + pthread_mutex_destroy(&ax_mtx); + pthread_mutex_destroy(&ax_io_mtx); + atomic_store_explicit(&fpt->running, false, memory_order_relaxed); + + pthread_join(fpt->thread, result); + return 0; +} + + #endif /* SNMP_AGENTX */