diff --git a/lib/link_state.c b/lib/link_state.c index 58727a568b53..6537f881ce15 100644 --- a/lib/link_state.c +++ b/lib/link_state.c @@ -1138,31 +1138,13 @@ int ls_unregister(struct zclient *zclient, bool server) int ls_request_sync(struct zclient *zclient) { - struct stream *s; - uint16_t flags = 0; - /* Check buffer size */ if (STREAM_SIZE(zclient->obuf) < (ZEBRA_HEADER_SIZE + 3 * sizeof(uint32_t))) return -1; - s = zclient->obuf; - stream_reset(s); - - zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT); - - /* Set type and flags */ - stream_putl(s, LINK_STATE_SYNC); - stream_putw(s, flags); - /* Send destination client info */ - stream_putc(s, zclient->redist_default); - stream_putw(s, zclient->instance); - stream_putl(s, zclient->session_id); - - /* Put length into the header at the start of the stream. */ - stream_putw_at(s, 0, stream_get_endp(s)); - - return zclient_send_message(zclient); + /* No data with this message */ + return zclient_send_opaque(zclient, LINK_STATE_SYNC, NULL, 0); } static struct ls_node *ls_parse_node(struct stream *s) @@ -1623,23 +1605,15 @@ int ls_send_msg(struct zclient *zclient, struct ls_message *msg, (ZEBRA_HEADER_SIZE + sizeof(uint32_t) + sizeof(msg))) return -1; + /* Init the message, then encode the data inline. */ + if (dst == NULL) + zapi_opaque_init(zclient, LINK_STATE_UPDATE, flags); + else + zapi_opaque_unicast_init(zclient, LINK_STATE_UPDATE, flags, + dst->proto, dst->instance, + dst->session_id); + s = zclient->obuf; - stream_reset(s); - - zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT); - - /* Set sub-type, flags and destination for unicast message */ - stream_putl(s, LINK_STATE_UPDATE); - if (dst != NULL) { - SET_FLAG(flags, ZAPI_OPAQUE_FLAG_UNICAST); - stream_putw(s, flags); - /* Send destination client info */ - stream_putc(s, dst->proto); - stream_putw(s, dst->instance); - stream_putl(s, dst->session_id); - } else { - stream_putw(s, flags); - } /* Format Link State message */ if (ls_format_msg(s, msg) < 0) { diff --git a/lib/log.c b/lib/log.c index 00b897dca1f2..df9b6c717633 100644 --- a/lib/log.c +++ b/lib/log.c @@ -457,7 +457,9 @@ static const struct zebra_desc_table command_types[] = { DESC_ENTRY(ZEBRA_TC_CLASS_ADD), DESC_ENTRY(ZEBRA_TC_CLASS_DELETE), DESC_ENTRY(ZEBRA_TC_FILTER_ADD), - DESC_ENTRY(ZEBRA_TC_FILTER_DELETE)}; + DESC_ENTRY(ZEBRA_TC_FILTER_DELETE), + DESC_ENTRY(ZEBRA_OPAQUE_NOTIFY) +}; #undef DESC_ENTRY static const struct zebra_desc_table unknown = {0, "unknown", '?'}; diff --git a/lib/zclient.c b/lib/zclient.c index 8526cbfaa1ce..1654c984bd32 100644 --- a/lib/zclient.c +++ b/lib/zclient.c @@ -3823,6 +3823,53 @@ enum zclient_send_status zclient_send_mlag_data(struct zclient *client, return zclient_send_message(client); } +/* + * Init/header setup for opaque zapi messages + */ +enum zclient_send_status zapi_opaque_init(struct zclient *zclient, + uint32_t type, uint16_t flags) +{ + struct stream *s; + + s = zclient->obuf; + stream_reset(s); + + zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT); + + /* Send sub-type and flags */ + stream_putl(s, type); + stream_putw(s, flags); + + /* Source daemon identifiers */ + stream_putc(s, zclient->redist_default); + stream_putw(s, zclient->instance); + stream_putl(s, zclient->session_id); + + return ZCLIENT_SEND_SUCCESS; +} + +/* + * Init, header setup for opaque unicast messages. + */ +enum zclient_send_status +zapi_opaque_unicast_init(struct zclient *zclient, uint32_t type, uint16_t flags, + uint8_t proto, uint16_t instance, uint32_t session_id) +{ + struct stream *s; + + s = zclient->obuf; + + /* Common init */ + zapi_opaque_init(zclient, type, flags | ZAPI_OPAQUE_FLAG_UNICAST); + + /* Send destination client info */ + stream_putc(s, proto); + stream_putw(s, instance); + stream_putl(s, session_id); + + return ZCLIENT_SEND_SUCCESS; +} + /* * Send an OPAQUE message, contents opaque to zebra. The message header * is a message subtype. @@ -3840,16 +3887,12 @@ enum zclient_send_status zclient_send_opaque(struct zclient *zclient, return ZCLIENT_SEND_FAILURE; s = zclient->obuf; - stream_reset(s); - zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT); - - /* Send sub-type and flags */ - stream_putl(s, type); - stream_putw(s, flags); + zapi_opaque_init(zclient, type, flags); /* Send opaque data */ - stream_write(s, data, datasize); + if (datasize > 0) + stream_write(s, data, datasize); /* Put length into the header at the start of the stream. */ stream_putw_at(s, 0, stream_get_endp(s)); @@ -3876,22 +3919,14 @@ zclient_send_opaque_unicast(struct zclient *zclient, uint32_t type, return ZCLIENT_SEND_FAILURE; s = zclient->obuf; - stream_reset(s); - - zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT); - /* Send sub-type and flags */ - SET_FLAG(flags, ZAPI_OPAQUE_FLAG_UNICAST); - stream_putl(s, type); - stream_putw(s, flags); - - /* Send destination client info */ - stream_putc(s, proto); - stream_putw(s, instance); - stream_putl(s, session_id); + /* Common init */ + zapi_opaque_unicast_init(zclient, type, flags, proto, instance, + session_id); /* Send opaque data */ - stream_write(s, data, datasize); + if (datasize > 0) + stream_write(s, data, datasize); /* Put length into the header at the start of the stream. */ stream_putw_at(s, 0, stream_get_endp(s)); @@ -3910,11 +3945,16 @@ int zclient_opaque_decode(struct stream *s, struct zapi_opaque_msg *info) STREAM_GETL(s, info->type); STREAM_GETW(s, info->flags); - /* Decode unicast client info if present */ + /* Decode sending daemon info */ + STREAM_GETC(s, info->src_proto); + STREAM_GETW(s, info->src_instance); + STREAM_GETL(s, info->src_session_id); + + /* Decode unicast destination info, if present */ if (CHECK_FLAG(info->flags, ZAPI_OPAQUE_FLAG_UNICAST)) { - STREAM_GETC(s, info->proto); - STREAM_GETW(s, info->instance); - STREAM_GETL(s, info->session_id); + STREAM_GETC(s, info->dest_proto); + STREAM_GETW(s, info->dest_instance); + STREAM_GETL(s, info->dest_session_id); } info->len = STREAM_READABLE(s); @@ -4472,3 +4512,125 @@ int zclient_send_zebra_gre_request(struct zclient *client, zclient_send_message(client); return 0; } + + +/* + * Opaque notification features + */ + +/* + * Common encode helper for opaque notifications, both registration + * and async notification messages. + */ +static int opaque_notif_encode_common(struct stream *s, uint32_t msg_type, + bool request, bool reg, uint8_t proto, + uint16_t instance, uint32_t session_id) +{ + int ret = 0; + uint8_t val = 0; + + stream_reset(s); + + zclient_create_header(s, ZEBRA_OPAQUE_NOTIFY, VRF_DEFAULT); + + /* Notification or request */ + if (request) + val = 1; + stream_putc(s, val); + + if (reg) + val = 1; + else + val = 0; + stream_putc(s, val); + + stream_putl(s, msg_type); + + stream_putc(s, proto); + stream_putw(s, instance); + stream_putl(s, session_id); + + /* And capture message length */ + stream_putw_at(s, 0, stream_get_endp(s)); + + return ret; +} + +/* + * Encode a zapi opaque message type notification into buffer 's' + */ +int zclient_opaque_notif_encode(struct stream *s, uint32_t msg_type, bool reg, + uint8_t proto, uint16_t instance, + uint32_t session_id) +{ + return opaque_notif_encode_common(s, msg_type, false /* !request */, + reg, proto, instance, session_id); +} + +/* + * Decode an incoming zapi opaque message type notification + */ +int zclient_opaque_notif_decode(struct stream *s, + struct zapi_opaque_notif_info *info) +{ + uint8_t val; + + memset(info, 0, sizeof(*info)); + + STREAM_GETC(s, val); /* Registration or notification */ + info->request = (val != 0); + + STREAM_GETC(s, val); + info->reg = (val != 0); + + STREAM_GETL(s, info->msg_type); + + STREAM_GETC(s, info->proto); + STREAM_GETW(s, info->instance); + STREAM_GETL(s, info->session_id); + + return 0; + +stream_failure: + return -1; +} + +/* + * Encode and send a zapi opaque message type notification request to zebra + */ +enum zclient_send_status zclient_opaque_request_notify(struct zclient *zclient, + uint32_t msgtype) +{ + struct stream *s; + + if (!zclient || zclient->sock < 0) + return ZCLIENT_SEND_FAILURE; + + s = zclient->obuf; + + opaque_notif_encode_common(s, msgtype, true /* request */, + true /* register */, zclient->redist_default, + zclient->instance, zclient->session_id); + + return zclient_send_message(zclient); +} + +/* + * Encode and send a request to drop notifications for an opaque message type. + */ +enum zclient_send_status zclient_opaque_drop_notify(struct zclient *zclient, + uint32_t msgtype) +{ + struct stream *s; + + if (!zclient || zclient->sock < 0) + return ZCLIENT_SEND_FAILURE; + + s = zclient->obuf; + + opaque_notif_encode_common(s, msgtype, true /* req */, + false /* unreg */, zclient->redist_default, + zclient->instance, zclient->session_id); + + return zclient_send_message(zclient); +} diff --git a/lib/zclient.h b/lib/zclient.h index e43393fd70e7..316dd4cd68b9 100644 --- a/lib/zclient.h +++ b/lib/zclient.h @@ -87,7 +87,9 @@ enum zserv_client_capabilities { extern struct sockaddr_storage zclient_addr; extern socklen_t zclient_addr_len; -/* Zebra message types. */ +/* Zebra message types. Please update the corresponding + * command_types array with any changes! + */ typedef enum { ZEBRA_INTERFACE_ADD, ZEBRA_INTERFACE_DELETE, @@ -232,7 +234,11 @@ typedef enum { ZEBRA_TC_CLASS_DELETE, ZEBRA_TC_FILTER_ADD, ZEBRA_TC_FILTER_DELETE, + ZEBRA_OPAQUE_NOTIFY, } zebra_message_types_t; +/* Zebra message types. Please update the corresponding + * command_types array with any changes! + */ enum zebra_error_types { ZEBRA_UNKNOWN_ERROR, /* Error of unknown type */ @@ -1176,16 +1182,33 @@ zclient_send_opaque_unicast(struct zclient *zclient, uint32_t type, uint32_t session_id, const uint8_t *data, size_t datasize); +/* Init functions also provided for clients who want to encode their + * data inline into the zclient's stream buffer. Please use these instead + * of hand-encoding the header info, since that may change over time. + * Note that these will reset the zclient's outbound stream before encoding. + */ +enum zclient_send_status zapi_opaque_init(struct zclient *zclient, + uint32_t type, uint16_t flags); + +enum zclient_send_status +zapi_opaque_unicast_init(struct zclient *zclient, uint32_t type, uint16_t flags, + uint8_t proto, uint16_t instance, uint32_t session_id); + /* Struct representing the decoded opaque header info */ struct zapi_opaque_msg { uint32_t type; /* Subtype */ uint16_t len; /* len after zapi header and this info */ uint16_t flags; - /* Client-specific info - *if* UNICAST flag is set */ - uint8_t proto; - uint16_t instance; - uint32_t session_id; + /* Sending client info */ + uint8_t src_proto; + uint16_t src_instance; + uint32_t src_session_id; + + /* Destination client info - *if* UNICAST flag is set */ + uint8_t dest_proto; + uint16_t dest_instance; + uint32_t dest_session_id; }; #define ZAPI_OPAQUE_FLAG_UNICAST 0x01 @@ -1201,6 +1224,34 @@ struct zapi_opaque_reg_info { uint32_t session_id; }; +/* Simple struct conveying information about opaque notifications. + * Daemons can request notifications about the status of registration for + * opaque message types. For example, a client daemon can request notification + * when a server registers to receive a certain message code. Or a server can + * request notification when a subscriber registers for its output. + */ +struct zapi_opaque_notif_info { + bool request; /* Request to register, or notification from zebra */ + bool reg; /* Register or unregister */ + uint32_t msg_type; /* Target message code */ + + /* For notif registration, zapi info for the client. + * For notifications, zapi info for the message's server/registrant. + * For notification that there is no server/registrant, not present. + */ + uint8_t proto; + uint16_t instance; + uint32_t session_id; +}; + +/* The same ZAPI message is used for daemon->zebra requests, and for + * zebra->daemon notifications. + * Daemons send 'request' true, and 'reg' true or false. + * Zebra sends 'request' false, 'reg' set if the notification is a + * server/receiver registration for the message type, and false if the event + * is the end of registrations. + */ + /* Decode incoming opaque */ int zclient_opaque_decode(struct stream *msg, struct zapi_opaque_msg *info); @@ -1211,6 +1262,19 @@ enum zclient_send_status zclient_unregister_opaque(struct zclient *zclient, int zapi_opaque_reg_decode(struct stream *msg, struct zapi_opaque_reg_info *info); +/* Opaque notification features */ +enum zclient_send_status zclient_opaque_request_notify(struct zclient *zclient, + uint32_t msgtype); +enum zclient_send_status zclient_opaque_drop_notify(struct zclient *zclient, + uint32_t msgtype); + +/* Encode, decode an incoming zapi opaque notification */ +int zclient_opaque_notif_encode(struct stream *s, uint32_t msg_type, + bool reg /* register or unreg*/, uint8_t proto, + uint16_t instance, uint32_t session_id); +int zclient_opaque_notif_decode(struct stream *s, + struct zapi_opaque_notif_info *info); + /* * Registry of opaque message types. Please do not reuse an in-use * type code; some daemons are likely relying on it. diff --git a/sharpd/sharp_vty.c b/sharpd/sharp_vty.c index ca2212cd8713..3a2a7aa407d2 100644 --- a/sharpd/sharp_vty.c +++ b/sharpd/sharp_vty.c @@ -865,6 +865,24 @@ DEFPY (send_opaque_reg, return CMD_SUCCESS; } +/* Opaque notifications - register or unregister */ +DEFPY (send_opaque_notif_reg, + send_opaque_notif_reg_cmd, + "sharp send opaque notify type (1-1000)", + SHARP_STR + "Send messages for testing\n" + "Send opaque messages\n" + "Opaque notification messages\n" + "Send notify registration\n" + "Send notify unregistration\n" + "Opaque sub-type code\n" + "Opaque sub-type code\n") +{ + sharp_zebra_opaque_notif_reg((reg != NULL), type); + + return CMD_SUCCESS; +} + DEFPY (neigh_discover, neigh_discover_cmd, "sharp neigh discover [vrf NAME$vrf_name] IFNAME$ifname", @@ -1406,6 +1424,7 @@ void sharp_vty_init(void) install_element(ENABLE_NODE, &send_opaque_cmd); install_element(ENABLE_NODE, &send_opaque_unicast_cmd); install_element(ENABLE_NODE, &send_opaque_reg_cmd); + install_element(ENABLE_NODE, &send_opaque_notif_reg_cmd); install_element(ENABLE_NODE, &neigh_discover_cmd); install_element(ENABLE_NODE, &import_te_cmd); diff --git a/sharpd/sharp_zebra.c b/sharpd/sharp_zebra.c index 91fb7f03b107..df18118b028d 100644 --- a/sharpd/sharp_zebra.c +++ b/sharpd/sharp_zebra.c @@ -805,6 +805,28 @@ static int sharp_opaque_handler(ZAPI_CALLBACK_ARGS) return 0; } +/* Handler for opaque notification messages */ +static int sharp_opq_notify_handler(ZAPI_CALLBACK_ARGS) +{ + struct stream *s; + struct zapi_opaque_notif_info info; + + s = zclient->ibuf; + + if (zclient_opaque_notif_decode(s, &info) != 0) + return -1; + + if (info.reg) + zlog_debug("%s: received opaque notification REG, type %u => %d/%d/%d", + __func__, info.msg_type, info.proto, info.instance, + info.session_id); + else + zlog_debug("%s: received opaque notification UNREG, type %u", + __func__, info.msg_type); + + return 0; +} + /* * Send OPAQUE messages, using subtype 'type'. */ @@ -840,6 +862,17 @@ void sharp_opaque_send(uint32_t type, uint32_t proto, uint32_t instance, } } +/* + * Register/unregister for opaque notifications from zebra about 'type'. + */ +void sharp_zebra_opaque_notif_reg(bool is_reg, uint32_t type) +{ + if (is_reg) + zclient_opaque_request_notify(zclient, type); + else + zclient_opaque_drop_notify(zclient, type); +} + /* * Send OPAQUE registration messages, using subtype 'type'. */ @@ -1036,6 +1069,7 @@ static zclient_handler *const sharp_handlers[] = { [ZEBRA_REDISTRIBUTE_ROUTE_ADD] = sharp_redistribute_route, [ZEBRA_REDISTRIBUTE_ROUTE_DEL] = sharp_redistribute_route, [ZEBRA_OPAQUE_MESSAGE] = sharp_opaque_handler, + [ZEBRA_OPAQUE_NOTIFY] = sharp_opq_notify_handler, [ZEBRA_SRV6_MANAGER_GET_LOCATOR_CHUNK] = sharp_zebra_process_srv6_locator_chunk, }; diff --git a/sharpd/sharp_zebra.h b/sharpd/sharp_zebra.h index cf8689799aa5..025b4d8f8257 100644 --- a/sharpd/sharp_zebra.h +++ b/sharpd/sharp_zebra.h @@ -39,10 +39,15 @@ int sharp_install_lsps_helper(bool install_p, bool update_p, void sharp_opaque_send(uint32_t type, uint32_t proto, uint32_t instance, uint32_t session_id, uint32_t count); -/* Send OPAQUE registration messages, using subtype 'type'. */ +/* Send OPAQUE registration or notification registration messages, + * for opaque subtype 'type'. + */ void sharp_opaque_reg_send(bool is_reg, uint32_t proto, uint32_t instance, uint32_t session_id, uint32_t type); +/* Register/unregister for opaque notifications from zebra about 'type'. */ +void sharp_zebra_opaque_notif_reg(bool is_reg, uint32_t type); + extern void sharp_zebra_send_arp(const struct interface *ifp, const struct prefix *p); diff --git a/zebra/zapi_msg.c b/zebra/zapi_msg.c index 4bc9f4acfa84..b91e2ec7407d 100644 --- a/zebra/zapi_msg.c +++ b/zebra/zapi_msg.c @@ -3982,8 +3982,7 @@ void zserv_handle_commands(struct zserv *client, struct stream_fifo *fifo) hdr.length -= ZEBRA_HEADER_SIZE; /* Before checking for a handler function, check for - * special messages that are handled in another module; - * we'll treat these as opaque. + * special messages that are handled the 'opaque zapi' module. */ if (zebra_opaque_handles_msgid(hdr.command)) { /* Reset message buffer */ diff --git a/zebra/zebra_opaque.c b/zebra/zebra_opaque.c index 8ceb1f8dc56f..9503c7469726 100644 --- a/zebra/zebra_opaque.c +++ b/zebra/zebra_opaque.c @@ -26,10 +26,16 @@ struct opq_client_reg { int instance; uint32_t session_id; + int flags; + struct opq_client_reg *next; struct opq_client_reg *prev; }; +/* Registration is for receiving or for notifications */ +#define OPQ_CLIENT_FLAG_RECV 0x01 +#define OPQ_CLIENT_FLAG_NOTIFY 0x02 + /* Opaque message registration info */ struct opq_msg_reg { struct opq_regh_item item; @@ -99,14 +105,18 @@ static int handle_opq_registration(const struct zmsghdr *hdr, struct stream *msg); static int handle_opq_unregistration(const struct zmsghdr *hdr, struct stream *msg); +static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg); +static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info); static int dispatch_opq_messages(struct stream_fifo *msg_fifo); static struct opq_msg_reg *opq_reg_lookup(uint32_t type); static bool opq_client_match(const struct opq_client_reg *client, const struct zapi_opaque_reg_info *info); +static bool opq_client_notif_match(const struct opq_client_reg *client, + const struct zapi_opaque_notif_info *info); static struct opq_msg_reg *opq_reg_alloc(uint32_t type); static void opq_reg_free(struct opq_msg_reg **reg); -static struct opq_client_reg *opq_client_alloc( - const struct zapi_opaque_reg_info *info); +static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance, + uint32_t session_id); static void opq_client_free(struct opq_client_reg **client); static const char *opq_client2str(char *buf, size_t buflen, const struct opq_client_reg *client); @@ -213,6 +223,7 @@ bool zebra_opaque_handles_msgid(uint16_t id) case ZEBRA_OPAQUE_MESSAGE: case ZEBRA_OPAQUE_REGISTER: case ZEBRA_OPAQUE_UNREGISTER: + case ZEBRA_OPAQUE_NOTIFY: ret = true; break; default: @@ -243,7 +254,7 @@ uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch) } } - /* Schedule module pthread to process the batch */ + /* Schedule module's pthread to process the batch */ if (counter > 0) { if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL) zlog_debug("%s: received %u messages", @@ -325,6 +336,38 @@ static void process_messages(struct event *event) stream_fifo_deinit(&fifo); } +/* + * Helper to acquire/lock a client session and send the message in 's'. + * Note that 's' is enqueued for an io pthread, so don't free it + * or touch it if this returns 'true'. + */ +static bool opq_send_message(uint8_t proto, uint16_t instance, + uint32_t session_id, struct stream *s) +{ + bool ret = false; + struct zserv *zclient; + + /* + * TODO -- this isn't ideal: we're going through an + * acquire/release cycle for each client for each + * message. Replace this with a batching version. + */ + zclient = zserv_acquire_client(proto, instance, session_id); + if (zclient) { + /* + * Sending a message actually means enqueuing + * it for a zapi io pthread to send - so we + * don't touch the message after this call. + */ + zserv_send_message(zclient, s); + + zserv_release_client(zclient); + ret = true; + } + + return ret; +} + /* * Process (dispatch) or drop opaque messages. */ @@ -336,7 +379,6 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) struct opq_msg_reg *reg; int ret; struct opq_client_reg *client; - struct zserv *zclient; char buf[50]; while ((msg = stream_fifo_pop(msg_fifo)) != NULL) { @@ -350,6 +392,9 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) { handle_opq_unregistration(&hdr, msg); continue; + } else if (hdr.command == ZEBRA_OPAQUE_NOTIFY) { + handle_opq_notif_req(&hdr, msg); + continue; } /* We only process OPAQUE messages - drop anything else */ @@ -381,9 +426,9 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) { - if (client->proto != info.proto || - client->instance != info.instance || - client->session_id != info.session_id) + if (client->proto != info.dest_proto || + client->instance != info.dest_instance || + client->session_id != info.dest_session_id) continue; if (IS_ZEBRA_DEBUG_RECV && @@ -400,36 +445,25 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) dup = stream_dup(msg); } + if (IS_ZEBRA_DEBUG_SEND && IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: sending %s to client %s", + __func__, (dup ? "dup" : "msg"), + opq_client2str(buf, sizeof(buf), + client)); + /* * TODO -- this isn't ideal: we're going through an * acquire/release cycle for each client for each * message. Replace this with a batching version. */ - zclient = zserv_acquire_client(client->proto, - client->instance, - client->session_id); - if (zclient) { - if (IS_ZEBRA_DEBUG_SEND && - IS_ZEBRA_DEBUG_DETAIL) - zlog_debug("%s: sending %s to client %s", - __func__, - (dup ? "dup" : "msg"), - opq_client2str(buf, - sizeof(buf), - client)); - - /* - * Sending a message actually means enqueuing - * it for a zapi io pthread to send - so we - * don't touch the message after this call. - */ - zserv_send_message(zclient, dup ? dup : msg); + if (opq_send_message(client->proto, client->instance, + client->session_id, + (dup ? dup : msg))) { + /* Message is gone - don't touch it */ if (dup) dup = NULL; else msg = NULL; - - zserv_release_client(zclient); } else { if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL) @@ -457,6 +491,67 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) return 0; } +/* Enqueue registration client object */ +static void opq_enqueue_client(struct opq_msg_reg *reg, + struct opq_client_reg *client) +{ + client->next = reg->clients; + if (reg->clients) + reg->clients->prev = client; + reg->clients = client; +} + +/* Dequeue registration client object */ +static void opq_dequeue_client(struct opq_msg_reg *reg, + struct opq_client_reg *client) +{ + if (client->prev) + client->prev->next = client->next; + if (client->next) + client->next->prev = client->prev; + if (reg->clients == client) + reg->clients = client->next; +} + +/* + * Send notification messages to any interested clients in 'reg', + * about 'server'; the sense is 'registered' (or not). + * The 'server' is not required for un-registrations. + */ +static void opq_send_notifications(const struct opq_msg_reg *reg, + const struct opq_client_reg *server, + bool registered) +{ + const struct opq_client_reg *client; + struct stream *msg = NULL; + + /* If there are any notification clients, send them a message */ + for (client = reg->clients; client; client = client->next) { + if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) { + msg = stream_new(ZEBRA_SMALL_PACKET_SIZE); + + if (registered) { + zclient_opaque_notif_encode(msg, reg->type, + registered, + server->proto, + server->instance, + server->session_id); + } else { + zclient_opaque_notif_encode(msg, reg->type, + registered, 0, 0, 0); + } + + /* Locate zebra client and enqueue message to it */ + if (!opq_send_message(client->proto, client->instance, + client->session_id, msg)) { + /* Error - need to free the message */ + stream_free(msg); + msg = NULL; + } + } + } +} + /* * Process a register/unregister message */ @@ -499,7 +594,9 @@ static int handle_opq_registration(const struct zmsghdr *hdr, goto done; } - client = opq_client_alloc(&info); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV); if (IS_ZEBRA_DEBUG_RECV) zlog_debug("%s: client %s registers for %u", @@ -508,17 +605,20 @@ static int handle_opq_registration(const struct zmsghdr *hdr, info.type); /* Link client into registration */ - client->next = reg->clients; - if (reg->clients) - reg->clients->prev = client; - reg->clients = client; + opq_enqueue_client(reg, client); + + /* Send notifications to any clients who want them */ + opq_send_notifications(reg, client, true); + } else { /* * No existing registrations - create one, add the * client, and add registration to hash. */ reg = opq_reg_alloc(info.type); - client = opq_client_alloc(&info); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV); if (IS_ZEBRA_DEBUG_RECV) zlog_debug("%s: client %s registers for new reg %u", @@ -545,8 +645,9 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr, { int ret = 0; struct zapi_opaque_reg_info info; - struct opq_client_reg *client; + struct opq_client_reg *client, *tclient; struct opq_msg_reg key, *reg; + int scount; char buf[50]; memset(&info, 0, sizeof(info)); @@ -571,11 +672,16 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr, goto done; } - /* Look for client */ - for (client = reg->clients; client != NULL; - client = client->next) { - if (opq_client_match(client, &info)) - break; + /* Look for client info, count servers and notif clients too */ + client = NULL; + scount = 0; + + for (tclient = reg->clients; tclient != NULL; tclient = tclient->next) { + if (opq_client_match(tclient, &info)) + client = tclient; + + if (CHECK_FLAG(tclient->flags, OPQ_CLIENT_FLAG_RECV)) + scount++; } if (client == NULL) { @@ -592,19 +698,18 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr, __func__, opq_client2str(buf, sizeof(buf), client), info.type); - if (client->prev) - client->prev->next = client->next; - if (client->next) - client->next->prev = client->prev; - if (reg->clients == client) - reg->clients = client->next; - + opq_dequeue_client(reg, client); opq_client_free(&client); + scount--; /* Is registration empty now? */ if (reg->clients == NULL) { + opq_regh_del(&opq_reg_hash, reg); opq_reg_free(®); + } else if (scount == 0) { + /* Send notifications if no more servers for the message. */ + opq_send_notifications(reg, NULL, false); } done: @@ -613,13 +718,182 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr, return ret; } +/* + * Handle requests about opaque notifications. + */ +static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg) +{ + int ret; + struct zapi_opaque_notif_info info = {}; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + ret = zclient_opaque_notif_decode(msg, &info); + if (ret < 0) + goto done; + + /* Handle deregistration */ + if (!info.reg) { + ret = handle_opq_notif_unreg(&info); + goto done; + } + + memset(&key, 0, sizeof(key)); + + key.type = info.msg_type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg) { + /* Look for dup client */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (opq_client_notif_match(client, &info)) + break; + } + + if (client) { + /* Oops - duplicate ? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: duplicate opq notif reg client %s", + __func__, opq_client2str(buf, + sizeof(buf), + client)); + goto done; + } + + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for notif %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.msg_type); + + /* Link client into registration */ + opq_enqueue_client(reg, client); + + /* Send notification if any registered servers */ + /* Look for a server */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV)) + break; + } + if (client) + opq_send_notifications(reg, client, true); + + } else if (info.reg) { + /* + * No existing registrations - create one, add the + * client, and add registration to hash. + */ + reg = opq_reg_alloc(info.msg_type); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for new notif %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.msg_type); + + reg->clients = client; + + opq_regh_add(&opq_reg_hash, reg); + } + +done: + stream_free(msg); + return ret; +} + +/* + * Unregister notification + */ +static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info) +{ + int ret = 0; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + memset(&key, 0, sizeof(key)); + + key.type = info->msg_type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg == NULL) { + /* Weird: unregister for unknown message? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters notif for unknown type %u", + __func__, zebra_route_string(info->proto), + info->instance, info->session_id, + info->msg_type); + goto done; + } + + /* Look for client */ + for (client = reg->clients; client != NULL; client = client->next) { + if (opq_client_notif_match(client, info)) + break; + } + + if (client == NULL) { + /* Oops - unregister for unknown client? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters notif for %u", + __func__, zebra_route_string(info->proto), + info->instance, info->session_id, + info->msg_type); + goto done; + } + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s unregisters notif for %u", __func__, + opq_client2str(buf, sizeof(buf), client), + info->msg_type); + + /* Dequeue client object */ + opq_dequeue_client(reg, client); + + opq_client_free(&client); + + /* Is registration empty now? */ + if (reg->clients == NULL) { + opq_regh_del(&opq_reg_hash, reg); + opq_reg_free(®); + } + +done: + + return ret; +} + /* Compare utility for registered clients */ static bool opq_client_match(const struct opq_client_reg *client, const struct zapi_opaque_reg_info *info) { - if (client->proto == info->proto && - client->instance == info->instance && - client->session_id == info->session_id) + /* look for matching client, skip notifications */ + if (client->proto == info->proto && client->instance == info->instance && + client->session_id == info->session_id && + CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV)) + return true; + else + return false; +} + +/* Compare helper for clients registered for notifications */ +static bool opq_client_notif_match(const struct opq_client_reg *client, + const struct zapi_opaque_notif_info *info) +{ + /* look for matching client, only for notifications */ + if (client->proto == info->proto && client->instance == info->instance && + client->session_id == info->session_id && + CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) return true; else return false; @@ -655,16 +929,16 @@ static void opq_reg_free(struct opq_msg_reg **reg) XFREE(MTYPE_OPQ, (*reg)); } -static struct opq_client_reg *opq_client_alloc( - const struct zapi_opaque_reg_info *info) +static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance, + uint32_t session_id) { struct opq_client_reg *client; client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg)); - client->proto = info->proto; - client->instance = info->instance; - client->session_id = info->session_id; + client->proto = proto; + client->instance = instance; + client->session_id = session_id; return client; }