Merge pull request #13771 from mjstapp/opaque_notify

lib, zebra, sharpd: add notifications for ZAPI opaque message registration events
This commit is contained in:
Donatas Abraitis 2023-06-26 13:08:52 +03:00 committed by GitHub
commit 4262dc3bd8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 657 additions and 124 deletions

View file

@ -1138,31 +1138,13 @@ int ls_unregister(struct zclient *zclient, bool server)
int ls_request_sync(struct zclient *zclient)
{
struct stream *s;
uint16_t flags = 0;
/* Check buffer size */
if (STREAM_SIZE(zclient->obuf)
< (ZEBRA_HEADER_SIZE + 3 * sizeof(uint32_t)))
return -1;
s = zclient->obuf;
stream_reset(s);
zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT);
/* Set type and flags */
stream_putl(s, LINK_STATE_SYNC);
stream_putw(s, flags);
/* Send destination client info */
stream_putc(s, zclient->redist_default);
stream_putw(s, zclient->instance);
stream_putl(s, zclient->session_id);
/* Put length into the header at the start of the stream. */
stream_putw_at(s, 0, stream_get_endp(s));
return zclient_send_message(zclient);
/* No data with this message */
return zclient_send_opaque(zclient, LINK_STATE_SYNC, NULL, 0);
}
static struct ls_node *ls_parse_node(struct stream *s)
@ -1623,23 +1605,15 @@ int ls_send_msg(struct zclient *zclient, struct ls_message *msg,
(ZEBRA_HEADER_SIZE + sizeof(uint32_t) + sizeof(msg)))
return -1;
/* Init the message, then encode the data inline. */
if (dst == NULL)
zapi_opaque_init(zclient, LINK_STATE_UPDATE, flags);
else
zapi_opaque_unicast_init(zclient, LINK_STATE_UPDATE, flags,
dst->proto, dst->instance,
dst->session_id);
s = zclient->obuf;
stream_reset(s);
zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT);
/* Set sub-type, flags and destination for unicast message */
stream_putl(s, LINK_STATE_UPDATE);
if (dst != NULL) {
SET_FLAG(flags, ZAPI_OPAQUE_FLAG_UNICAST);
stream_putw(s, flags);
/* Send destination client info */
stream_putc(s, dst->proto);
stream_putw(s, dst->instance);
stream_putl(s, dst->session_id);
} else {
stream_putw(s, flags);
}
/* Format Link State message */
if (ls_format_msg(s, msg) < 0) {

View file

@ -457,7 +457,9 @@ static const struct zebra_desc_table command_types[] = {
DESC_ENTRY(ZEBRA_TC_CLASS_ADD),
DESC_ENTRY(ZEBRA_TC_CLASS_DELETE),
DESC_ENTRY(ZEBRA_TC_FILTER_ADD),
DESC_ENTRY(ZEBRA_TC_FILTER_DELETE)};
DESC_ENTRY(ZEBRA_TC_FILTER_DELETE),
DESC_ENTRY(ZEBRA_OPAQUE_NOTIFY)
};
#undef DESC_ENTRY
static const struct zebra_desc_table unknown = {0, "unknown", '?'};

View file

@ -3823,6 +3823,53 @@ enum zclient_send_status zclient_send_mlag_data(struct zclient *client,
return zclient_send_message(client);
}
/*
* Init/header setup for opaque zapi messages
*/
enum zclient_send_status zapi_opaque_init(struct zclient *zclient,
uint32_t type, uint16_t flags)
{
struct stream *s;
s = zclient->obuf;
stream_reset(s);
zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT);
/* Send sub-type and flags */
stream_putl(s, type);
stream_putw(s, flags);
/* Source daemon identifiers */
stream_putc(s, zclient->redist_default);
stream_putw(s, zclient->instance);
stream_putl(s, zclient->session_id);
return ZCLIENT_SEND_SUCCESS;
}
/*
* Init, header setup for opaque unicast messages.
*/
enum zclient_send_status
zapi_opaque_unicast_init(struct zclient *zclient, uint32_t type, uint16_t flags,
uint8_t proto, uint16_t instance, uint32_t session_id)
{
struct stream *s;
s = zclient->obuf;
/* Common init */
zapi_opaque_init(zclient, type, flags | ZAPI_OPAQUE_FLAG_UNICAST);
/* Send destination client info */
stream_putc(s, proto);
stream_putw(s, instance);
stream_putl(s, session_id);
return ZCLIENT_SEND_SUCCESS;
}
/*
* Send an OPAQUE message, contents opaque to zebra. The message header
* is a message subtype.
@ -3840,16 +3887,12 @@ enum zclient_send_status zclient_send_opaque(struct zclient *zclient,
return ZCLIENT_SEND_FAILURE;
s = zclient->obuf;
stream_reset(s);
zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT);
/* Send sub-type and flags */
stream_putl(s, type);
stream_putw(s, flags);
zapi_opaque_init(zclient, type, flags);
/* Send opaque data */
stream_write(s, data, datasize);
if (datasize > 0)
stream_write(s, data, datasize);
/* Put length into the header at the start of the stream. */
stream_putw_at(s, 0, stream_get_endp(s));
@ -3876,22 +3919,14 @@ zclient_send_opaque_unicast(struct zclient *zclient, uint32_t type,
return ZCLIENT_SEND_FAILURE;
s = zclient->obuf;
stream_reset(s);
zclient_create_header(s, ZEBRA_OPAQUE_MESSAGE, VRF_DEFAULT);
/* Send sub-type and flags */
SET_FLAG(flags, ZAPI_OPAQUE_FLAG_UNICAST);
stream_putl(s, type);
stream_putw(s, flags);
/* Send destination client info */
stream_putc(s, proto);
stream_putw(s, instance);
stream_putl(s, session_id);
/* Common init */
zapi_opaque_unicast_init(zclient, type, flags, proto, instance,
session_id);
/* Send opaque data */
stream_write(s, data, datasize);
if (datasize > 0)
stream_write(s, data, datasize);
/* Put length into the header at the start of the stream. */
stream_putw_at(s, 0, stream_get_endp(s));
@ -3910,11 +3945,16 @@ int zclient_opaque_decode(struct stream *s, struct zapi_opaque_msg *info)
STREAM_GETL(s, info->type);
STREAM_GETW(s, info->flags);
/* Decode unicast client info if present */
/* Decode sending daemon info */
STREAM_GETC(s, info->src_proto);
STREAM_GETW(s, info->src_instance);
STREAM_GETL(s, info->src_session_id);
/* Decode unicast destination info, if present */
if (CHECK_FLAG(info->flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
STREAM_GETC(s, info->proto);
STREAM_GETW(s, info->instance);
STREAM_GETL(s, info->session_id);
STREAM_GETC(s, info->dest_proto);
STREAM_GETW(s, info->dest_instance);
STREAM_GETL(s, info->dest_session_id);
}
info->len = STREAM_READABLE(s);
@ -4472,3 +4512,125 @@ int zclient_send_zebra_gre_request(struct zclient *client,
zclient_send_message(client);
return 0;
}
/*
* Opaque notification features
*/
/*
* Common encode helper for opaque notifications, both registration
* and async notification messages.
*/
static int opaque_notif_encode_common(struct stream *s, uint32_t msg_type,
bool request, bool reg, uint8_t proto,
uint16_t instance, uint32_t session_id)
{
int ret = 0;
uint8_t val = 0;
stream_reset(s);
zclient_create_header(s, ZEBRA_OPAQUE_NOTIFY, VRF_DEFAULT);
/* Notification or request */
if (request)
val = 1;
stream_putc(s, val);
if (reg)
val = 1;
else
val = 0;
stream_putc(s, val);
stream_putl(s, msg_type);
stream_putc(s, proto);
stream_putw(s, instance);
stream_putl(s, session_id);
/* And capture message length */
stream_putw_at(s, 0, stream_get_endp(s));
return ret;
}
/*
* Encode a zapi opaque message type notification into buffer 's'
*/
int zclient_opaque_notif_encode(struct stream *s, uint32_t msg_type, bool reg,
uint8_t proto, uint16_t instance,
uint32_t session_id)
{
return opaque_notif_encode_common(s, msg_type, false /* !request */,
reg, proto, instance, session_id);
}
/*
* Decode an incoming zapi opaque message type notification
*/
int zclient_opaque_notif_decode(struct stream *s,
struct zapi_opaque_notif_info *info)
{
uint8_t val;
memset(info, 0, sizeof(*info));
STREAM_GETC(s, val); /* Registration or notification */
info->request = (val != 0);
STREAM_GETC(s, val);
info->reg = (val != 0);
STREAM_GETL(s, info->msg_type);
STREAM_GETC(s, info->proto);
STREAM_GETW(s, info->instance);
STREAM_GETL(s, info->session_id);
return 0;
stream_failure:
return -1;
}
/*
* Encode and send a zapi opaque message type notification request to zebra
*/
enum zclient_send_status zclient_opaque_request_notify(struct zclient *zclient,
uint32_t msgtype)
{
struct stream *s;
if (!zclient || zclient->sock < 0)
return ZCLIENT_SEND_FAILURE;
s = zclient->obuf;
opaque_notif_encode_common(s, msgtype, true /* request */,
true /* register */, zclient->redist_default,
zclient->instance, zclient->session_id);
return zclient_send_message(zclient);
}
/*
* Encode and send a request to drop notifications for an opaque message type.
*/
enum zclient_send_status zclient_opaque_drop_notify(struct zclient *zclient,
uint32_t msgtype)
{
struct stream *s;
if (!zclient || zclient->sock < 0)
return ZCLIENT_SEND_FAILURE;
s = zclient->obuf;
opaque_notif_encode_common(s, msgtype, true /* req */,
false /* unreg */, zclient->redist_default,
zclient->instance, zclient->session_id);
return zclient_send_message(zclient);
}

View file

@ -87,7 +87,9 @@ enum zserv_client_capabilities {
extern struct sockaddr_storage zclient_addr;
extern socklen_t zclient_addr_len;
/* Zebra message types. */
/* Zebra message types. Please update the corresponding
* command_types array with any changes!
*/
typedef enum {
ZEBRA_INTERFACE_ADD,
ZEBRA_INTERFACE_DELETE,
@ -232,7 +234,11 @@ typedef enum {
ZEBRA_TC_CLASS_DELETE,
ZEBRA_TC_FILTER_ADD,
ZEBRA_TC_FILTER_DELETE,
ZEBRA_OPAQUE_NOTIFY,
} zebra_message_types_t;
/* Zebra message types. Please update the corresponding
* command_types array with any changes!
*/
enum zebra_error_types {
ZEBRA_UNKNOWN_ERROR, /* Error of unknown type */
@ -1176,16 +1182,33 @@ zclient_send_opaque_unicast(struct zclient *zclient, uint32_t type,
uint32_t session_id, const uint8_t *data,
size_t datasize);
/* Init functions also provided for clients who want to encode their
* data inline into the zclient's stream buffer. Please use these instead
* of hand-encoding the header info, since that may change over time.
* Note that these will reset the zclient's outbound stream before encoding.
*/
enum zclient_send_status zapi_opaque_init(struct zclient *zclient,
uint32_t type, uint16_t flags);
enum zclient_send_status
zapi_opaque_unicast_init(struct zclient *zclient, uint32_t type, uint16_t flags,
uint8_t proto, uint16_t instance, uint32_t session_id);
/* Struct representing the decoded opaque header info */
struct zapi_opaque_msg {
uint32_t type; /* Subtype */
uint16_t len; /* len after zapi header and this info */
uint16_t flags;
/* Client-specific info - *if* UNICAST flag is set */
uint8_t proto;
uint16_t instance;
uint32_t session_id;
/* Sending client info */
uint8_t src_proto;
uint16_t src_instance;
uint32_t src_session_id;
/* Destination client info - *if* UNICAST flag is set */
uint8_t dest_proto;
uint16_t dest_instance;
uint32_t dest_session_id;
};
#define ZAPI_OPAQUE_FLAG_UNICAST 0x01
@ -1201,6 +1224,34 @@ struct zapi_opaque_reg_info {
uint32_t session_id;
};
/* Simple struct conveying information about opaque notifications.
* Daemons can request notifications about the status of registration for
* opaque message types. For example, a client daemon can request notification
* when a server registers to receive a certain message code. Or a server can
* request notification when a subscriber registers for its output.
*/
struct zapi_opaque_notif_info {
bool request; /* Request to register, or notification from zebra */
bool reg; /* Register or unregister */
uint32_t msg_type; /* Target message code */
/* For notif registration, zapi info for the client.
* For notifications, zapi info for the message's server/registrant.
* For notification that there is no server/registrant, not present.
*/
uint8_t proto;
uint16_t instance;
uint32_t session_id;
};
/* The same ZAPI message is used for daemon->zebra requests, and for
* zebra->daemon notifications.
* Daemons send 'request' true, and 'reg' true or false.
* Zebra sends 'request' false, 'reg' set if the notification is a
* server/receiver registration for the message type, and false if the event
* is the end of registrations.
*/
/* Decode incoming opaque */
int zclient_opaque_decode(struct stream *msg, struct zapi_opaque_msg *info);
@ -1211,6 +1262,19 @@ enum zclient_send_status zclient_unregister_opaque(struct zclient *zclient,
int zapi_opaque_reg_decode(struct stream *msg,
struct zapi_opaque_reg_info *info);
/* Opaque notification features */
enum zclient_send_status zclient_opaque_request_notify(struct zclient *zclient,
uint32_t msgtype);
enum zclient_send_status zclient_opaque_drop_notify(struct zclient *zclient,
uint32_t msgtype);
/* Encode, decode an incoming zapi opaque notification */
int zclient_opaque_notif_encode(struct stream *s, uint32_t msg_type,
bool reg /* register or unreg*/, uint8_t proto,
uint16_t instance, uint32_t session_id);
int zclient_opaque_notif_decode(struct stream *s,
struct zapi_opaque_notif_info *info);
/*
* Registry of opaque message types. Please do not reuse an in-use
* type code; some daemons are likely relying on it.

View file

@ -865,6 +865,24 @@ DEFPY (send_opaque_reg,
return CMD_SUCCESS;
}
/* Opaque notifications - register or unregister */
DEFPY (send_opaque_notif_reg,
send_opaque_notif_reg_cmd,
"sharp send opaque notify <reg$reg | unreg> type (1-1000)",
SHARP_STR
"Send messages for testing\n"
"Send opaque messages\n"
"Opaque notification messages\n"
"Send notify registration\n"
"Send notify unregistration\n"
"Opaque sub-type code\n"
"Opaque sub-type code\n")
{
sharp_zebra_opaque_notif_reg((reg != NULL), type);
return CMD_SUCCESS;
}
DEFPY (neigh_discover,
neigh_discover_cmd,
"sharp neigh discover [vrf NAME$vrf_name] <A.B.C.D$dst4|X:X::X:X$dst6> IFNAME$ifname",
@ -1406,6 +1424,7 @@ void sharp_vty_init(void)
install_element(ENABLE_NODE, &send_opaque_cmd);
install_element(ENABLE_NODE, &send_opaque_unicast_cmd);
install_element(ENABLE_NODE, &send_opaque_reg_cmd);
install_element(ENABLE_NODE, &send_opaque_notif_reg_cmd);
install_element(ENABLE_NODE, &neigh_discover_cmd);
install_element(ENABLE_NODE, &import_te_cmd);

View file

@ -805,6 +805,28 @@ static int sharp_opaque_handler(ZAPI_CALLBACK_ARGS)
return 0;
}
/* Handler for opaque notification messages */
static int sharp_opq_notify_handler(ZAPI_CALLBACK_ARGS)
{
struct stream *s;
struct zapi_opaque_notif_info info;
s = zclient->ibuf;
if (zclient_opaque_notif_decode(s, &info) != 0)
return -1;
if (info.reg)
zlog_debug("%s: received opaque notification REG, type %u => %d/%d/%d",
__func__, info.msg_type, info.proto, info.instance,
info.session_id);
else
zlog_debug("%s: received opaque notification UNREG, type %u",
__func__, info.msg_type);
return 0;
}
/*
* Send OPAQUE messages, using subtype 'type'.
*/
@ -840,6 +862,17 @@ void sharp_opaque_send(uint32_t type, uint32_t proto, uint32_t instance,
}
}
/*
* Register/unregister for opaque notifications from zebra about 'type'.
*/
void sharp_zebra_opaque_notif_reg(bool is_reg, uint32_t type)
{
if (is_reg)
zclient_opaque_request_notify(zclient, type);
else
zclient_opaque_drop_notify(zclient, type);
}
/*
* Send OPAQUE registration messages, using subtype 'type'.
*/
@ -1036,6 +1069,7 @@ static zclient_handler *const sharp_handlers[] = {
[ZEBRA_REDISTRIBUTE_ROUTE_ADD] = sharp_redistribute_route,
[ZEBRA_REDISTRIBUTE_ROUTE_DEL] = sharp_redistribute_route,
[ZEBRA_OPAQUE_MESSAGE] = sharp_opaque_handler,
[ZEBRA_OPAQUE_NOTIFY] = sharp_opq_notify_handler,
[ZEBRA_SRV6_MANAGER_GET_LOCATOR_CHUNK] =
sharp_zebra_process_srv6_locator_chunk,
};

View file

@ -39,10 +39,15 @@ int sharp_install_lsps_helper(bool install_p, bool update_p,
void sharp_opaque_send(uint32_t type, uint32_t proto, uint32_t instance,
uint32_t session_id, uint32_t count);
/* Send OPAQUE registration messages, using subtype 'type'. */
/* Send OPAQUE registration or notification registration messages,
* for opaque subtype 'type'.
*/
void sharp_opaque_reg_send(bool is_reg, uint32_t proto, uint32_t instance,
uint32_t session_id, uint32_t type);
/* Register/unregister for opaque notifications from zebra about 'type'. */
void sharp_zebra_opaque_notif_reg(bool is_reg, uint32_t type);
extern void sharp_zebra_send_arp(const struct interface *ifp,
const struct prefix *p);

View file

@ -3982,8 +3982,7 @@ void zserv_handle_commands(struct zserv *client, struct stream_fifo *fifo)
hdr.length -= ZEBRA_HEADER_SIZE;
/* Before checking for a handler function, check for
* special messages that are handled in another module;
* we'll treat these as opaque.
* special messages that are handled the 'opaque zapi' module.
*/
if (zebra_opaque_handles_msgid(hdr.command)) {
/* Reset message buffer */

View file

@ -26,10 +26,16 @@ struct opq_client_reg {
int instance;
uint32_t session_id;
int flags;
struct opq_client_reg *next;
struct opq_client_reg *prev;
};
/* Registration is for receiving or for notifications */
#define OPQ_CLIENT_FLAG_RECV 0x01
#define OPQ_CLIENT_FLAG_NOTIFY 0x02
/* Opaque message registration info */
struct opq_msg_reg {
struct opq_regh_item item;
@ -99,14 +105,18 @@ static int handle_opq_registration(const struct zmsghdr *hdr,
struct stream *msg);
static int handle_opq_unregistration(const struct zmsghdr *hdr,
struct stream *msg);
static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg);
static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info);
static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
static bool opq_client_match(const struct opq_client_reg *client,
const struct zapi_opaque_reg_info *info);
static bool opq_client_notif_match(const struct opq_client_reg *client,
const struct zapi_opaque_notif_info *info);
static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
static void opq_reg_free(struct opq_msg_reg **reg);
static struct opq_client_reg *opq_client_alloc(
const struct zapi_opaque_reg_info *info);
static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance,
uint32_t session_id);
static void opq_client_free(struct opq_client_reg **client);
static const char *opq_client2str(char *buf, size_t buflen,
const struct opq_client_reg *client);
@ -213,6 +223,7 @@ bool zebra_opaque_handles_msgid(uint16_t id)
case ZEBRA_OPAQUE_MESSAGE:
case ZEBRA_OPAQUE_REGISTER:
case ZEBRA_OPAQUE_UNREGISTER:
case ZEBRA_OPAQUE_NOTIFY:
ret = true;
break;
default:
@ -243,7 +254,7 @@ uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
}
}
/* Schedule module pthread to process the batch */
/* Schedule module's pthread to process the batch */
if (counter > 0) {
if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
zlog_debug("%s: received %u messages",
@ -325,6 +336,38 @@ done:
stream_fifo_deinit(&fifo);
}
/*
* Helper to acquire/lock a client session and send the message in 's'.
* Note that 's' is enqueued for an io pthread, so don't free it
* or touch it if this returns 'true'.
*/
static bool opq_send_message(uint8_t proto, uint16_t instance,
uint32_t session_id, struct stream *s)
{
bool ret = false;
struct zserv *zclient;
/*
* TODO -- this isn't ideal: we're going through an
* acquire/release cycle for each client for each
* message. Replace this with a batching version.
*/
zclient = zserv_acquire_client(proto, instance, session_id);
if (zclient) {
/*
* Sending a message actually means enqueuing
* it for a zapi io pthread to send - so we
* don't touch the message after this call.
*/
zserv_send_message(zclient, s);
zserv_release_client(zclient);
ret = true;
}
return ret;
}
/*
* Process (dispatch) or drop opaque messages.
*/
@ -336,7 +379,6 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
struct opq_msg_reg *reg;
int ret;
struct opq_client_reg *client;
struct zserv *zclient;
char buf[50];
while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
@ -350,6 +392,9 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
} else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
handle_opq_unregistration(&hdr, msg);
continue;
} else if (hdr.command == ZEBRA_OPAQUE_NOTIFY) {
handle_opq_notif_req(&hdr, msg);
continue;
}
/* We only process OPAQUE messages - drop anything else */
@ -381,9 +426,9 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
if (client->proto != info.proto ||
client->instance != info.instance ||
client->session_id != info.session_id)
if (client->proto != info.dest_proto ||
client->instance != info.dest_instance ||
client->session_id != info.dest_session_id)
continue;
if (IS_ZEBRA_DEBUG_RECV &&
@ -400,36 +445,25 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
dup = stream_dup(msg);
}
if (IS_ZEBRA_DEBUG_SEND && IS_ZEBRA_DEBUG_DETAIL)
zlog_debug("%s: sending %s to client %s",
__func__, (dup ? "dup" : "msg"),
opq_client2str(buf, sizeof(buf),
client));
/*
* TODO -- this isn't ideal: we're going through an
* acquire/release cycle for each client for each
* message. Replace this with a batching version.
*/
zclient = zserv_acquire_client(client->proto,
client->instance,
client->session_id);
if (zclient) {
if (IS_ZEBRA_DEBUG_SEND &&
IS_ZEBRA_DEBUG_DETAIL)
zlog_debug("%s: sending %s to client %s",
__func__,
(dup ? "dup" : "msg"),
opq_client2str(buf,
sizeof(buf),
client));
/*
* Sending a message actually means enqueuing
* it for a zapi io pthread to send - so we
* don't touch the message after this call.
*/
zserv_send_message(zclient, dup ? dup : msg);
if (opq_send_message(client->proto, client->instance,
client->session_id,
(dup ? dup : msg))) {
/* Message is gone - don't touch it */
if (dup)
dup = NULL;
else
msg = NULL;
zserv_release_client(zclient);
} else {
if (IS_ZEBRA_DEBUG_RECV &&
IS_ZEBRA_DEBUG_DETAIL)
@ -457,6 +491,67 @@ drop_it:
return 0;
}
/* Enqueue registration client object */
static void opq_enqueue_client(struct opq_msg_reg *reg,
struct opq_client_reg *client)
{
client->next = reg->clients;
if (reg->clients)
reg->clients->prev = client;
reg->clients = client;
}
/* Dequeue registration client object */
static void opq_dequeue_client(struct opq_msg_reg *reg,
struct opq_client_reg *client)
{
if (client->prev)
client->prev->next = client->next;
if (client->next)
client->next->prev = client->prev;
if (reg->clients == client)
reg->clients = client->next;
}
/*
* Send notification messages to any interested clients in 'reg',
* about 'server'; the sense is 'registered' (or not).
* The 'server' is not required for un-registrations.
*/
static void opq_send_notifications(const struct opq_msg_reg *reg,
const struct opq_client_reg *server,
bool registered)
{
const struct opq_client_reg *client;
struct stream *msg = NULL;
/* If there are any notification clients, send them a message */
for (client = reg->clients; client; client = client->next) {
if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) {
msg = stream_new(ZEBRA_SMALL_PACKET_SIZE);
if (registered) {
zclient_opaque_notif_encode(msg, reg->type,
registered,
server->proto,
server->instance,
server->session_id);
} else {
zclient_opaque_notif_encode(msg, reg->type,
registered, 0, 0, 0);
}
/* Locate zebra client and enqueue message to it */
if (!opq_send_message(client->proto, client->instance,
client->session_id, msg)) {
/* Error - need to free the message */
stream_free(msg);
msg = NULL;
}
}
}
}
/*
* Process a register/unregister message
*/
@ -499,7 +594,9 @@ static int handle_opq_registration(const struct zmsghdr *hdr,
goto done;
}
client = opq_client_alloc(&info);
client = opq_client_alloc(info.proto, info.instance,
info.session_id);
SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV);
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: client %s registers for %u",
@ -508,17 +605,20 @@ static int handle_opq_registration(const struct zmsghdr *hdr,
info.type);
/* Link client into registration */
client->next = reg->clients;
if (reg->clients)
reg->clients->prev = client;
reg->clients = client;
opq_enqueue_client(reg, client);
/* Send notifications to any clients who want them */
opq_send_notifications(reg, client, true);
} else {
/*
* No existing registrations - create one, add the
* client, and add registration to hash.
*/
reg = opq_reg_alloc(info.type);
client = opq_client_alloc(&info);
client = opq_client_alloc(info.proto, info.instance,
info.session_id);
SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV);
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: client %s registers for new reg %u",
@ -545,8 +645,9 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr,
{
int ret = 0;
struct zapi_opaque_reg_info info;
struct opq_client_reg *client;
struct opq_client_reg *client, *tclient;
struct opq_msg_reg key, *reg;
int scount;
char buf[50];
memset(&info, 0, sizeof(info));
@ -571,11 +672,16 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr,
goto done;
}
/* Look for client */
for (client = reg->clients; client != NULL;
client = client->next) {
if (opq_client_match(client, &info))
break;
/* Look for client info, count servers and notif clients too */
client = NULL;
scount = 0;
for (tclient = reg->clients; tclient != NULL; tclient = tclient->next) {
if (opq_client_match(tclient, &info))
client = tclient;
if (CHECK_FLAG(tclient->flags, OPQ_CLIENT_FLAG_RECV))
scount++;
}
if (client == NULL) {
@ -592,12 +698,167 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr,
__func__, opq_client2str(buf, sizeof(buf), client),
info.type);
if (client->prev)
client->prev->next = client->next;
if (client->next)
client->next->prev = client->prev;
if (reg->clients == client)
reg->clients = client->next;
opq_dequeue_client(reg, client);
opq_client_free(&client);
scount--;
/* Is registration empty now? */
if (reg->clients == NULL) {
opq_regh_del(&opq_reg_hash, reg);
opq_reg_free(&reg);
} else if (scount == 0) {
/* Send notifications if no more servers for the message. */
opq_send_notifications(reg, NULL, false);
}
done:
stream_free(msg);
return ret;
}
/*
* Handle requests about opaque notifications.
*/
static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg)
{
int ret;
struct zapi_opaque_notif_info info = {};
struct opq_client_reg *client;
struct opq_msg_reg key, *reg;
char buf[50];
ret = zclient_opaque_notif_decode(msg, &info);
if (ret < 0)
goto done;
/* Handle deregistration */
if (!info.reg) {
ret = handle_opq_notif_unreg(&info);
goto done;
}
memset(&key, 0, sizeof(key));
key.type = info.msg_type;
reg = opq_regh_find(&opq_reg_hash, &key);
if (reg) {
/* Look for dup client */
for (client = reg->clients; client != NULL;
client = client->next) {
if (opq_client_notif_match(client, &info))
break;
}
if (client) {
/* Oops - duplicate ? */
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: duplicate opq notif reg client %s",
__func__, opq_client2str(buf,
sizeof(buf),
client));
goto done;
}
client = opq_client_alloc(info.proto, info.instance,
info.session_id);
SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY);
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: client %s registers for notif %u",
__func__,
opq_client2str(buf, sizeof(buf), client),
info.msg_type);
/* Link client into registration */
opq_enqueue_client(reg, client);
/* Send notification if any registered servers */
/* Look for a server */
for (client = reg->clients; client != NULL;
client = client->next) {
if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV))
break;
}
if (client)
opq_send_notifications(reg, client, true);
} else if (info.reg) {
/*
* No existing registrations - create one, add the
* client, and add registration to hash.
*/
reg = opq_reg_alloc(info.msg_type);
client = opq_client_alloc(info.proto, info.instance,
info.session_id);
SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY);
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: client %s registers for new notif %u",
__func__,
opq_client2str(buf, sizeof(buf), client),
info.msg_type);
reg->clients = client;
opq_regh_add(&opq_reg_hash, reg);
}
done:
stream_free(msg);
return ret;
}
/*
* Unregister notification
*/
static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info)
{
int ret = 0;
struct opq_client_reg *client;
struct opq_msg_reg key, *reg;
char buf[50];
memset(&key, 0, sizeof(key));
key.type = info->msg_type;
reg = opq_regh_find(&opq_reg_hash, &key);
if (reg == NULL) {
/* Weird: unregister for unknown message? */
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: unknown client %s/%u/%u unregisters notif for unknown type %u",
__func__, zebra_route_string(info->proto),
info->instance, info->session_id,
info->msg_type);
goto done;
}
/* Look for client */
for (client = reg->clients; client != NULL; client = client->next) {
if (opq_client_notif_match(client, info))
break;
}
if (client == NULL) {
/* Oops - unregister for unknown client? */
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: unknown client %s/%u/%u unregisters notif for %u",
__func__, zebra_route_string(info->proto),
info->instance, info->session_id,
info->msg_type);
goto done;
}
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: client %s unregisters notif for %u", __func__,
opq_client2str(buf, sizeof(buf), client),
info->msg_type);
/* Dequeue client object */
opq_dequeue_client(reg, client);
opq_client_free(&client);
@ -609,7 +870,6 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr,
done:
stream_free(msg);
return ret;
}
@ -617,9 +877,23 @@ done:
static bool opq_client_match(const struct opq_client_reg *client,
const struct zapi_opaque_reg_info *info)
{
if (client->proto == info->proto &&
client->instance == info->instance &&
client->session_id == info->session_id)
/* look for matching client, skip notifications */
if (client->proto == info->proto && client->instance == info->instance &&
client->session_id == info->session_id &&
CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV))
return true;
else
return false;
}
/* Compare helper for clients registered for notifications */
static bool opq_client_notif_match(const struct opq_client_reg *client,
const struct zapi_opaque_notif_info *info)
{
/* look for matching client, only for notifications */
if (client->proto == info->proto && client->instance == info->instance &&
client->session_id == info->session_id &&
CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY))
return true;
else
return false;
@ -655,16 +929,16 @@ static void opq_reg_free(struct opq_msg_reg **reg)
XFREE(MTYPE_OPQ, (*reg));
}
static struct opq_client_reg *opq_client_alloc(
const struct zapi_opaque_reg_info *info)
static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance,
uint32_t session_id)
{
struct opq_client_reg *client;
client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
client->proto = info->proto;
client->instance = info->instance;
client->session_id = info->session_id;
client->proto = proto;
client->instance = instance;
client->session_id = session_id;
return client;
}