mgmtd: add front-end notification selector support

Signed-off-by: Christian Hopps <chopps@labn.net>
This commit is contained in:
Christian Hopps 2024-06-04 10:29:46 -04:00
parent 33b73f8e3a
commit 657f1650e6
2 changed files with 112 additions and 7 deletions

View file

@ -176,6 +176,7 @@ DECLARE_MTYPE(MSG_NATIVE_RPC_REPLY);
#define MGMT_MSG_CODE_EDIT_REPLY 6 /* Public API */
#define MGMT_MSG_CODE_RPC 7 /* Public API */
#define MGMT_MSG_CODE_RPC_REPLY 8 /* Public API */
#define MGMT_MSG_CODE_NOTIFY_SELECT 9 /* Public API */
/*
* Datastores
@ -426,6 +427,27 @@ _Static_assert(sizeof(struct mgmt_msg_rpc_reply) ==
offsetof(struct mgmt_msg_rpc_reply, data),
"Size mismatch");
/**
* struct mgmt_msg_notify_select - Add notification selectors for FE client.
*
* Add xpath prefix notification selectors to limit the notifications sent
* to the front-end client.
*
* @selectors: the xpath prefixes to selectors notifications through.
* @repalce: if true replace existing selectors with `selectors`.
*/
struct mgmt_msg_notify_select {
struct mgmt_msg_header;
uint8_t replace;
uint8_t resv2[7];
alignas(8) char selectors[];
};
_Static_assert(sizeof(struct mgmt_msg_notify_select) ==
offsetof(struct mgmt_msg_notify_select, selectors),
"Size mismatch");
/*
* Validate that the message ends in a NUL terminating byte
*/

View file

@ -43,6 +43,7 @@ struct mgmt_fe_session_ctx {
uint64_t txn_id;
uint64_t cfg_txn_id;
uint8_t ds_locked[MGMTD_DS_MAX_ID];
const char **notify_xpaths;
struct event *proc_cfg_txn_clnp;
struct event *proc_show_txn_clnp;
@ -1401,10 +1402,45 @@ static void fe_adapter_handle_edit(struct mgmt_fe_session_ctx *session,
}
}
/**
* fe_adapter_handle_notify_select() - Handle an Notify Select message.
* @session: the client session.
* @__msg: the message data.
* @msg_len: the length of the message data.
*/
static void fe_adapter_handle_notify_select(struct mgmt_fe_session_ctx *session,
void *__msg, size_t msg_len)
{
struct mgmt_msg_notify_select *msg = __msg;
uint64_t req_id = msg->req_id;
const char **selectors = NULL;
const char **new;
/* An empty message clears the selectors */
if (msg_len >= sizeof(*msg)) {
selectors = mgmt_msg_native_strings_decode(msg, msg_len,
msg->selectors);
if (!selectors) {
fe_adapter_send_error(session, req_id, false, -EINVAL,
"Invalid message");
return;
}
}
if (msg->replace) {
darr_free_free(session->notify_xpaths);
session->notify_xpaths = selectors;
} else {
new = darr_append_nz(session->notify_xpaths,
darr_len(selectors));
memcpy(new, selectors, darr_len(selectors) * sizeof(*selectors));
darr_free(selectors);
}
}
/**
* fe_adapter_handle_rpc() - Handle an RPC message from an FE client.
* @session: the client session.
* @msg_raw: the message data.
* @__msg: the message data.
* @msg_len: the length of the message data.
*/
static void fe_adapter_handle_rpc(struct mgmt_fe_session_ctx *session,
@ -1503,13 +1539,26 @@ static void fe_adapter_handle_native_msg(struct mgmt_fe_client_adapter *adapter,
assert(session->adapter == adapter);
switch (msg->code) {
case MGMT_MSG_CODE_GET_DATA:
fe_adapter_handle_get_data(session, msg, msg_len);
break;
case MGMT_MSG_CODE_EDIT:
__dbg("adapter %s: session-id %" PRIu64 " received EDIT message",
adapter->name, msg->refer_id);
fe_adapter_handle_edit(session, msg, msg_len);
break;
case MGMT_MSG_CODE_NOTIFY_SELECT:
__dbg("adapter %s: session-id %" PRIu64
" received NOTIFY_SELECT message",
adapter->name, msg->refer_id);
fe_adapter_handle_notify_select(session, msg, msg_len);
break;
case MGMT_MSG_CODE_GET_DATA:
__dbg("adapter %s: session-id %" PRIu64
" received GET_DATA message",
adapter->name, msg->refer_id);
fe_adapter_handle_get_data(session, msg, msg_len);
break;
case MGMT_MSG_CODE_RPC:
__dbg("adapter %s: session-id %" PRIu64 " received RPC message",
adapter->name, msg->refer_id);
fe_adapter_handle_rpc(session, msg, msg_len);
break;
default:
@ -1554,14 +1603,48 @@ void mgmt_fe_adapter_send_notify(struct mgmt_msg_notify_data *msg, size_t msglen
{
struct mgmt_fe_client_adapter *adapter;
struct mgmt_fe_session_ctx *session;
struct nb_node *nb_node;
const char **xpath_prefix;
const char *notif;
bool sendit;
uint len;
assert(msg->refer_id == 0);
notif = mgmt_msg_native_xpath_decode(msg, msglen);
if (!notif) {
__log_err("Corrupt notify msg");
return;
}
/*
* We need the nb_node to obtain a path which does not include any
* specific list entry selectors
*/
nb_node = nb_node_find(notif);
if (!nb_node) {
__log_err("No schema found for notification: %s", notif);
return;
}
FOREACH_ADAPTER_IN_LIST (adapter) {
FOREACH_SESSION_IN_LIST (adapter, session) {
msg->refer_id = session->session_id;
(void)fe_adapter_send_native_msg(adapter, msg, msglen,
false);
/* If no selectors then always send */
sendit = !session->notify_xpaths;
darr_foreach_p (session->notify_xpaths, xpath_prefix) {
len = strlen(*xpath_prefix);
if (!strncmp(*xpath_prefix, notif, len) ||
!strncmp(*xpath_prefix, nb_node->xpath,
len)) {
sendit = true;
break;
}
}
if (sendit) {
msg->refer_id = session->session_id;
(void)fe_adapter_send_native_msg(adapter, msg,
msglen, false);
}
}
}
msg->refer_id = 0;