Merge pull request #17876 from LabNConsulting/chopps/backend-ds-notify

Handle datastore notifications correctly in backend clients (daemons)
This commit is contained in:
Donald Sharp 2025-01-18 14:42:18 -05:00 committed by GitHub
commit 3c3b559706
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 564 additions and 197 deletions

View file

@ -1114,19 +1114,24 @@ static void be_client_handle_notify(struct mgmt_be_client *client, void *msgbuf,
size_t msg_len)
{
struct mgmt_msg_notify_data *notif_msg = msgbuf;
struct nb_node *nb_node;
struct lyd_node *dnode;
struct nb_node *nb_node, *nb_parent;
struct lyd_node *dnode = NULL;
const char *data = NULL;
const char *notif;
LY_ERR err;
bool is_yang_notify;
LY_ERR err = LY_SUCCESS;
debug_be_client("Received notification for client %s", client->name);
notif = mgmt_msg_native_xpath_data_decode(notif_msg, msg_len, data);
if (!notif || !data) {
if (!notif) {
log_err_be_client("Corrupt notify msg");
return;
}
if (!data && (notif_msg->op == NOTIFY_OP_DS_REPLACE || notif_msg->op == NOTIFY_OP_DS_PATCH)) {
log_err_be_client("Corrupt replace/patch notify msg: missing data");
return;
}
nb_node = nb_node_find(notif);
if (!nb_node) {
@ -1134,20 +1139,41 @@ static void be_client_handle_notify(struct mgmt_be_client *client, void *msgbuf,
return;
}
if (!nb_node->cbs.notify) {
is_yang_notify = !!CHECK_FLAG(nb_node->snode->nodetype, LYS_NOTIF);
if (is_yang_notify && !nb_node->cbs.notify) {
debug_be_client("No notification callback for: %s", notif);
return;
}
err = yang_parse_notification(notif, notif_msg->result_type, data,
if (!nb_node->cbs.notify) {
/*
* See if a parent has a callback, this is so backend's can
* listen for changes on an entire datastore sub-tree.
*/
for (nb_parent = nb_node->parent; nb_parent; nb_parent = nb_node->parent)
if (nb_parent->cbs.notify)
break;
if (!nb_parent) {
debug_be_client("Including parents, no DS notification callback for: %s",
notif);
return;
}
nb_node = nb_parent;
}
if (data && is_yang_notify) {
err = yang_parse_notification(notif, notif_msg->result_type, data, &dnode);
} else if (data) {
err = yang_parse_data(notif, notif_msg->result_type, false, true, false, data,
&dnode);
}
if (err) {
log_err_be_client("Can't parse notification data for: %s",
notif);
log_err_be_client("Can't parse notification data for: %s", notif);
return;
}
nb_callback_notify(nb_node, notif, dnode);
nb_callback_notify(nb_node, notif_msg->op, notif, dnode);
lyd_free_all(dnode);
}

View file

@ -1857,7 +1857,7 @@ int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath,
return nb_node->cbs.rpc(&args);
}
void nb_callback_notify(const struct nb_node *nb_node, const char *xpath,
void nb_callback_notify(const struct nb_node *nb_node, uint8_t op, const char *xpath,
struct lyd_node *dnode)
{
struct nb_cb_notify_args args = {};
@ -1865,6 +1865,7 @@ void nb_callback_notify(const struct nb_node *nb_node, const char *xpath,
DEBUGD(&nb_dbg_cbs_notify, "northbound notify: %s", xpath);
args.xpath = xpath;
args.op = op;
args.dnode = dnode;
nb_node->cbs.notify(&args);
}

View file

@ -305,6 +305,7 @@ struct nb_cb_rpc_args {
struct nb_cb_notify_args {
/* XPath of the notification. */
const char *xpath;
uint8_t op;
/*
* libyang data node representing the notification. If the notification
@ -861,7 +862,7 @@ extern const void *nb_callback_lookup_next(const struct nb_node *nb_node,
extern int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath,
const struct lyd_node *input, struct lyd_node *output,
char *errmsg, size_t errmsg_len);
extern void nb_callback_notify(const struct nb_node *nb_node, const char *xpath,
extern void nb_callback_notify(const struct nb_node *nb_node, uint8_t op, const char *xpath,
struct lyd_node *dnode);
/*

View file

@ -480,87 +480,96 @@ static struct op_changes_group *op_changes_group_next(void)
/* Query for changes and notify */
/* ---------------------------- */
static void timer_walk_abort(struct nb_notif_walk_args *args);
static void timer_walk_continue(struct event *event);
static void timer_walk_done(struct nb_notif_walk_args *args);
static struct op_change *__next_change(struct op_changes_group *group)
{
struct op_change *next = RB_NEXT(op_changes, group->cur_change);
/* Remove and free current so retry works */
RB_REMOVE(op_changes, group->cur_changes, group->cur_change);
op_change_free(group->cur_change);
return next;
}
static struct op_changes_group *__next_group(struct op_changes_group *group)
{
__dbg("done with oper-path collection for group");
op_changes_group_free(group);
return op_changes_group_next();
}
static enum nb_error oper_walk_done(const struct lyd_node *tree, void *arg, enum nb_error ret)
{
struct nb_notif_walk_args *args = arg;
struct op_changes_group *group = args->group;
const char *path = group->cur_change->path;
const char *op = group->cur_changes == &group->adds ? "add" : "delete";
/* we don't send batches when yielding as we need completed edit in any patch */
assert(ret != NB_YIELD);
nb_notif_walk = NULL;
if (ret == NB_ERR_NOT_FOUND) {
__dbg("Path not found while walking oper tree: %s", path);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
return ret;
}
/* Something else went wrong with the walk */
if (ret != NB_OK) {
ret = NB_OK;
} else if (ret != NB_OK) {
error:
__log_err("Error notifying for datastore change on path: %s: %s", path,
nb_err_name(ret));
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
/* XXX Need to inform mgmtd/front-ends things are out-of-sync */
return ret;
}
__log_err("Error notifying for datastore path: %s: %s", path, nb_err_name(ret));
__dbg("done with oper-path collection for %s path: %s", op, path);
timer_walk_abort(args);
goto done;
} else {
__dbg("Done with oper-path collection for path: %s", path);
/* Do we need this? */
while (tree->parent)
tree = lyd_parent(tree);
/* Do we need this? */
while (tree->parent)
tree = lyd_parent(tree);
/* Send the add (replace) notification */
if (mgmt_be_send_ds_replace_notification(path, tree)) {
ret = NB_ERR;
goto error;
/* Send the add (replace) notification */
if (mgmt_be_send_ds_replace_notification(path, tree)) {
__log_err("Error sending notification message for path: %s", path);
ret = NB_ERR;
goto error;
}
}
/*
* Advance to next change (either dels or adds or both).
* Advance to next change.
*/
group->cur_change = RB_NEXT(op_changes, group->cur_change);
group->cur_change = __next_change(group);
if (!group->cur_change) {
__dbg("done with oper-path collection for group");
op_changes_group_free(group);
group = op_changes_group_next();
args->group = group;
if (!group) {
__dbg("done with ALL oper-path collection for notification");
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
args->group = __next_group(group);
if (!args->group) {
timer_walk_done(args);
goto done;
}
}
/* Run next walk after giving other events a shot to run */
event_add_timer_msec(nb_notif_master, timer_walk_continue, args, 0, &nb_notif_timer);
done:
/* Done with current walk and scheduled next one if there is more */
nb_notif_walk = NULL;
return NB_OK;
return ret;
}
static LY_ERR nb_notify_delete_changes(struct nb_notif_walk_args *args)
static int nb_notify_delete_changes(struct nb_notif_walk_args *args)
{
struct op_changes_group *group = args->group;
LY_ERR err;
group->cur_change = RB_MIN(op_changes, group->cur_changes);
while (group->cur_change) {
err = mgmt_be_send_ds_delete_notification(group->cur_change->path);
assert(err == LY_SUCCESS); /* XXX */
group->cur_change = RB_NEXT(op_changes, group->cur_change);
if (mgmt_be_send_ds_delete_notification(group->cur_change->path)) {
__log_err("Error sending delete notification message for path: %s",
group->cur_change->path);
return 1;
}
group->cur_change = __next_change(group);
}
return LY_SUCCESS;
return 0;
}
static void timer_walk_continue(struct event *event)
@ -568,15 +577,17 @@ static void timer_walk_continue(struct event *event)
struct nb_notif_walk_args *args = EVENT_ARG(event);
struct op_changes_group *group = args->group;
const char *path;
LY_ERR err;
int ret;
/*
* Notify about deletes until we have add changes to collect.
*/
while (group->cur_changes == &group->dels) {
err = nb_notify_delete_changes(args);
assert(err == LY_SUCCESS); /* XXX */
assert(!group->cur_change); /* we send all the deletes in one message */
ret = nb_notify_delete_changes(args);
if (ret) {
timer_walk_abort(args);
return;
}
/* after deletes advance to adds */
group->cur_changes = &group->adds;
@ -584,14 +595,9 @@ static void timer_walk_continue(struct event *event)
if (group->cur_change)
break;
__dbg("done with oper-path change group");
op_changes_group_free(group);
group = op_changes_group_next();
args->group = group;
if (!group) {
__dbg("done with ALL oper-path changes");
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
args->group = __next_group(group);
if (!args->group) {
timer_walk_done(args);
return;
}
}
@ -621,6 +627,22 @@ static void timer_walk_start(struct event *event)
timer_walk_continue(event);
}
static void timer_walk_abort(struct nb_notif_walk_args *args)
{
__dbg("Failed notifying datastore changes, will retry");
__dbg("oper-state notify setting retry timer to fire in: %d msec ", NB_NOTIF_TIMER_MSEC);
event_add_timer_msec(nb_notif_master, timer_walk_continue, args, NB_NOTIF_TIMER_MSEC,
&nb_notif_timer);
}
static void timer_walk_done(struct nb_notif_walk_args *args)
{
__dbg("Finished notifying for all datastore changes");
assert(!args->group);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
}
static void nb_notif_set_walk_timer(void)
{
if (nb_notif_walk) {
@ -659,19 +681,23 @@ void nb_notif_init(struct event_loop *tm)
void nb_notif_terminate(void)
{
struct nb_notif_walk_args *args;
struct nb_notif_walk_args *args = nb_notif_timer ? EVENT_ARG(nb_notif_timer) : NULL;
struct op_changes_group *group;
__dbg("terminating: timer: %p timer arg: %p walk %p", nb_notif_timer, args, nb_notif_walk);
EVENT_OFF(nb_notif_timer);
if (nb_notif_walk) {
nb_oper_cancel_walk(nb_notif_walk);
/* need to free the group that's in the walk */
/* Grab walk args from walk if active. */
args = nb_oper_walk_finish_arg(nb_notif_walk);
if (args)
op_changes_group_free(args->group);
nb_oper_cancel_walk(nb_notif_walk);
nb_notif_walk = NULL;
}
if (args) {
op_changes_group_free(args->group);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
}
while ((group = op_changes_group_next()))
op_changes_group_free(group);

View file

@ -874,6 +874,60 @@ static void ly_zlog_cb(LY_LOG_LEVEL level, const char *msg, const char *data_pat
zlog(priority, "libyang: %s", msg);
}
LY_ERR yang_parse_data(const char *xpath, LYD_FORMAT format, bool as_subtree, bool is_oper,
bool validate, const char *data, struct lyd_node **tree)
{
struct ly_in *in = NULL;
struct lyd_node *subtree = NULL;
uint32_t parse_options = LYD_PARSE_STRICT | LYD_PARSE_ONLY;
uint32_t validate_options = LYD_VALIDATE_PRESENT;
LY_ERR err;
err = ly_in_new_memory(data, &in);
if (err != LY_SUCCESS)
return err;
if (as_subtree) {
struct lyd_node *parent;
/*
* Create the subtree branch from root using the xpath. This
* will be used below to parse the data rooted at the subtree --
* a common YANG JSON technique (vs XML which starts all
* data trees from the root).
*/
err = lyd_new_path2(NULL, ly_native_ctx, xpath, NULL, 0, 0, 0, &parent, &subtree);
if (err != LY_SUCCESS)
goto done;
err = lyd_find_path(parent, xpath, false, &subtree);
if (err != LY_SUCCESS)
goto done;
}
if (is_oper)
validate_options |= LYD_VALIDATE_OPERATIONAL;
#ifdef LYD_VALIDATE_NOT_FINAL
if (!validate)
validate_options |= LYD_VALIDATE_NOT_FINAL;
#endif
err = lyd_parse_data(ly_native_ctx, subtree, in, format, parse_options, validate_options,
tree);
if (err == LY_SUCCESS && subtree)
*tree = subtree;
done:
ly_in_free(in, 0);
if (err != LY_SUCCESS) {
if (*tree)
lyd_free_all(*tree);
else if (subtree)
lyd_free_all(subtree);
*tree = NULL;
}
return err;
}
LY_ERR yang_parse_notification(const char *xpath, LYD_FORMAT format,
const char *data, struct lyd_node **notif)
{

View file

@ -681,6 +681,25 @@ extern struct ly_ctx *yang_ctx_new_setup(bool embedded_modules, bool explicit_co
*/
extern void yang_debugging_set(bool enable);
/*
* Parse YANG data.
*
* Args:
* xpath: xpath of the data.
* format: LYD_FORMAT of input data.
* as_subtree: parse the data as starting at the subtree identified by xpath.
* is_oper: parse as operational state allows for invalid (logs warning).
* validate: validate the data (otherwise treat as non-final).
* data: input data.
* notif: pointer to the libyang data tree to store the parsed notification.
* If the notification is not on the top level of the yang model,
* the pointer to the notification node is still returned, but it's
* part of the full data tree with all its parents.
*/
LY_ERR yang_parse_data(const char *xpath, LYD_FORMAT format, bool as_subtree, bool is_oper,
bool validate, const char *data, struct lyd_node **tree);
/*
* Parse a YANG notification.
*

View file

@ -84,6 +84,13 @@ static const char *const zebra_oper_xpaths[] = {
NULL,
};
#ifdef HAVE_MGMTD_TESTC
static const char *const mgmtd_testc_oper_xpaths[] = {
"/frr-backend:clients",
NULL,
};
#endif
#ifdef HAVE_RIPD
static const char *const ripd_config_xpaths[] = {
"/frr-filter:lib",
@ -154,6 +161,9 @@ static const char *const *be_client_config_xpaths[MGMTD_BE_CLIENT_ID_MAX] = {
};
static const char *const *be_client_oper_xpaths[MGMTD_BE_CLIENT_ID_MAX] = {
#ifdef HAVE_MGMTD_TESTC
[MGMTD_BE_CLIENT_ID_TESTC] = mgmtd_testc_oper_xpaths,
#endif
#ifdef HAVE_RIPD
[MGMTD_BE_CLIENT_ID_RIPD] = ripd_oper_xpaths,
#endif

View file

@ -107,32 +107,49 @@ static void mgmt_fe_free_ns_strings(struct ns_string_head *head)
ns_string_fini(head);
}
static void mgmt_fe_ns_string_remove_session(struct ns_string_head *head,
struct mgmt_fe_session_ctx *session)
static uint64_t mgmt_fe_ns_string_remove_session(struct ns_string_head *head,
struct mgmt_fe_session_ctx *session)
{
struct listnode *node;
struct ns_string *ns;
uint64_t clients = 0;
frr_each_safe (ns_string, head, ns) {
listnode_delete(ns->sessions, session);
node = listnode_lookup(ns->sessions, session);
if (!node)
continue;
list_delete_node(ns->sessions, node);
clients |= mgmt_be_interested_clients(ns->s, MGMT_BE_XPATH_SUBSCR_TYPE_OPER);
if (list_isempty(ns->sessions)) {
ns_string_del(head, ns);
mgmt_fe_free_ns_string(ns);
}
}
return clients;
}
static void mgmt_fe_add_ns_string(struct ns_string_head *head, const char *path, size_t plen,
struct mgmt_fe_session_ctx *session)
static uint64_t mgmt_fe_add_ns_string(struct ns_string_head *head, const char *path, size_t plen,
struct mgmt_fe_session_ctx *session)
{
struct ns_string *e, *ns;
uint64_t clients = 0;
ns = XCALLOC(MTYPE_MGMTD_XPATH, sizeof(*ns) + plen + 1);
strlcpy(ns->s, path, plen + 1);
e = ns_string_add(head, ns);
if (!e)
if (!e) {
ns->sessions = list_new();
if (!listnode_lookup(ns->sessions, session))
listnode_add(ns->sessions, session);
clients = mgmt_be_interested_clients(ns->s, MGMT_BE_XPATH_SUBSCR_TYPE_OPER);
} else {
XFREE(MTYPE_MGMTD_XPATH, ns);
if (!listnode_lookup(e->sessions, session))
listnode_add(e->sessions, session);
}
return clients;
}
char **mgmt_fe_get_all_selectors(void)
@ -1653,7 +1670,7 @@ static void fe_adapter_handle_notify_select(struct mgmt_fe_session_ctx *session,
}
if (msg->replace) {
mgmt_fe_ns_string_remove_session(&mgmt_fe_ns_strings, session);
clients = mgmt_fe_ns_string_remove_session(&mgmt_fe_ns_strings, session);
// [ ] Keep a local tree to optimize sending selectors to BE?
// [*] Or just KISS and fanout the original message to BEs?
// mgmt_remove_add_notify_selectors(session->notify_xpaths, selectors);
@ -1684,18 +1701,11 @@ static void fe_adapter_handle_notify_select(struct mgmt_fe_session_ctx *session,
/* Add the new selectors to the global tree */
darr_foreach_p (selectors, sp)
mgmt_fe_add_ns_string(&mgmt_fe_ns_strings, *sp, darr_strlen(*sp), session);
clients |= mgmt_fe_add_ns_string(&mgmt_fe_ns_strings, *sp, darr_strlen(*sp),
session);
/* Check if any backends are interested in the new selectors. */
if (msg->replace) {
/* If we are replacing we'll send all the selectors again with replace flag */
clients = mgmt_be_interested_clients("/", MGMT_BE_XPATH_SUBSCR_TYPE_OPER);
} else {
darr_foreach_p (selectors, sp)
clients |= mgmt_be_interested_clients(*sp, MGMT_BE_XPATH_SUBSCR_TYPE_OPER);
}
if (!clients) {
__dbg("No backends provide oper for notify selectors: '%s' txn-id %Lu session-id: %Lu",
__dbg("No backends to newly notify for selectors: '%s' txn-id %Lu session-id: %Lu",
selstr, session->txn_id, session->session_id);
goto done;
}

View file

@ -9,8 +9,10 @@
#include <zebra.h>
#include <lib/version.h>
#include "darr.h"
#include "debug.h"
#include "libfrr.h"
#include "mgmt_be_client.h"
#include "mgmt_msg_native.h"
#include "northbound.h"
/* ---------------- */
@ -43,15 +45,15 @@ struct zebra_privs_t __privs = {
.cap_num_i = 0,
};
#define OPTION_LISTEN 2000
#define OPTION_NOTIF_COUNT 2001
#define OPTION_TIMEOUT 2002
const struct option longopts[] = {
{ "listen", no_argument, NULL, OPTION_LISTEN },
{ "notif-count", required_argument, NULL, OPTION_NOTIF_COUNT },
{ "timeout", required_argument, NULL, OPTION_TIMEOUT },
{ 0 }
};
#define OPTION_DATASTORE 2000
#define OPTION_LISTEN 2001
#define OPTION_NOTIF_COUNT 2002
#define OPTION_TIMEOUT 2003
const struct option longopts[] = { { "datastore", no_argument, NULL, OPTION_DATASTORE },
{ "listen", no_argument, NULL, OPTION_LISTEN },
{ "notify-count", required_argument, NULL, OPTION_NOTIF_COUNT },
{ "timeout", required_argument, NULL, OPTION_TIMEOUT },
{ 0 } };
/* Master of threads. */
@ -79,6 +81,20 @@ struct frr_signal_t __signals[] = {
#define MGMTD_TESTC_VTY_PORT 2624
/* clang-format off */
static const struct frr_yang_module_info frr_if_info = {
.name = "frr-interface",
.ignore_cfg_cbs = true,
.nodes = {
{
.xpath = "/frr-interface:lib/interface",
.cbs.notify = async_notification,
},
{
.xpath = NULL,
}
}
};
static const struct frr_yang_module_info frr_ripd_info = {
.name = "frr-ripd",
.ignore_cfg_cbs = true,
@ -98,6 +114,8 @@ static const struct frr_yang_module_info frr_ripd_info = {
};
static const struct frr_yang_module_info *const mgmt_yang_modules[] = {
&frr_backend_info,
&frr_if_info,
&frr_ripd_info,
};
@ -123,6 +141,7 @@ const char **__rpc_xpaths;
struct mgmt_be_client_cbs __client_cbs = {};
struct event *event_timeout;
int f_datastore;
int o_notif_count = 1;
int o_timeout;
@ -165,10 +184,56 @@ static void success(struct event *event)
quit(0);
}
static void async_notification(struct nb_cb_notify_args *args)
static void __ds_notification(struct nb_cb_notify_args *args)
{
zlog_notice("Received YANG notification");
uint8_t *output = NULL;
zlog_notice("Received YANG datastore notification: op %u", args->op);
if (args->op == NOTIFY_OP_NOTIFICATION) {
zlog_warn("ignoring non-datastore op notification: %s", args->xpath);
return;
}
/* datastore notification */
switch (args->op) {
case NOTIFY_OP_DS_REPLACE:
printfrr("#OP=REPLACE: %s\n", args->xpath);
break;
case NOTIFY_OP_DS_DELETE:
printfrr("#OP=DELETE: %s\n", args->xpath);
break;
case NOTIFY_OP_DS_PATCH:
printfrr("#OP=PATCH: %s\n", args->xpath);
break;
default:
printfrr("#OP=%u: unknown notify op\n", args->op);
quit(1);
}
if (args->dnode && args->op != NOTIFY_OP_DS_DELETE) {
output = yang_print_tree(args->dnode, LYD_JSON, LYD_PRINT_SHRINK);
if (output) {
printfrr("%s\n", output);
darr_free(output);
}
}
fflush(stdout);
if (o_notif_count && !--o_notif_count)
quit(0);
}
static void __notification(struct nb_cb_notify_args *args)
{
zlog_notice("Received YANG notification: op: %u", args->op);
if (args->op != NOTIFY_OP_NOTIFICATION) {
zlog_warn("ignoring datastore notification: op: %u: path %s", args->op, args->xpath);
return;
}
/* bogus, we should print the actual data */
printf("{\"frr-ripd:authentication-failure\": {\"interface-name\": \"%s\"}}\n",
yang_dnode_get_string(args->dnode, "interface-name"));
@ -176,6 +241,14 @@ static void async_notification(struct nb_cb_notify_args *args)
quit(0);
}
static void async_notification(struct nb_cb_notify_args *args)
{
if (f_datastore)
__ds_notification(args);
else
__notification(args);
}
static int rpc_callback(struct nb_cb_rpc_args *args)
{
const char *vrf = NULL;
@ -210,6 +283,9 @@ int main(int argc, char **argv)
break;
switch (opt) {
case OPTION_DATASTORE:
f_datastore = 1;
break;
case OPTION_LISTEN:
f_listen = 1;
break;
@ -228,6 +304,9 @@ int main(int argc, char **argv)
master = frr_init();
mgmt_be_client_lib_vty_init();
mgmt_dbg_be_client.flags = DEBUG_MODE_ALL;
/*
* Setup notification listen
*/

View file

@ -0,0 +1,238 @@
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
#
# January 14 2025, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2025, LabN Consulting, L.L.C.
#
"""
Test YANG Datastore Notifications
"""
import json
import logging
import os
import re
import time
import pytest
from lib.topogen import Topogen
from lib.topotest import json_cmp
from munet.testing.util import waitline
from oper import check_kernel_32
pytestmark = [pytest.mark.ripd, pytest.mark.staticd, pytest.mark.mgmtd]
CWD = os.path.dirname(os.path.realpath(__file__))
FE_CLIENT = CWD + "/../lib/fe_client.py"
@pytest.fixture(scope="module")
def tgen(request):
"Setup/Teardown the environment and provide tgen argument to tests"
topodef = {
"s1": ("r1", "r2"),
}
tgen = Topogen(topodef, request.module.__name__)
tgen.start_topology()
router_list = tgen.routers()
for _, router in router_list.items():
router.load_frr_config("frr.conf")
tgen.start_router()
yield tgen
tgen.stop_topology()
def get_op_and_json(output):
op = ""
path = ""
data = ""
for line in output.split("\n"):
if not line:
continue
if not op:
m = re.match("#OP=([A-Z]*): (.*)", line)
if m:
op = m.group(1)
path = m.group(2)
continue
data += line + "\n"
if not op:
assert False, f"No notifcation op present in:\n{output}"
return op, path, data
def test_frontend_datastore_notification(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"].net
check_kernel_32(r1, "11.11.11.11", 1, "")
rc, _, _ = r1.cmd_status(FE_CLIENT + " --help")
if rc:
pytest.skip("No protoc or present cannot run test")
# Start our FE client in the background
p = r1.popen(
[FE_CLIENT, "--datastore", "--listen=/frr-interface:lib/interface/state"]
)
assert waitline(p.stderr, "Connected", timeout=10)
r1.cmd_raises("ip link set r1-eth0 mtu 1200")
# {"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"if-index":2,"mtu":1200,"mtu6":1200,"speed":10000,"metric":0,"phy-address":"ba:fd:de:b5:8b:90"}}]}}
try:
# Wait for FE client to exit
output, error = p.communicate(timeout=10)
op, path, data = get_op_and_json(output)
assert op == "REPLACE"
assert path.startswith("/frr-interface:lib/interface[name='r1-eth0']/state")
jsout = json.loads(data)
expected = json.loads(
'{"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"mtu":1200}}]}}'
)
result = json_cmp(jsout, expected)
assert result is None
finally:
p.kill()
r1.cmd_raises("ip link set r1-eth0 mtu 1500")
def test_backend_datastore_update(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"].net
check_kernel_32(r1, "11.11.11.11", 1, "")
be_client_path = "/usr/lib/frr/mgmtd_testc"
rc, _, _ = r1.cmd_status(be_client_path + " --help")
if rc:
pytest.skip("No mgmtd_testc")
# Start our BE client in the background
p = r1.popen(
[
be_client_path,
"--timeout=20",
"--log=file:/dev/stderr",
"--datastore",
"--listen",
"/frr-interface:lib/interface",
]
)
assert waitline(p.stderr, "Got SUBSCR_REPLY success 1", timeout=10)
r1.cmd_raises("ip link set r1-eth0 mtu 1200")
try:
expected = json.loads(
'{"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"mtu":1200}}]}}'
)
output, error = p.communicate(timeout=10)
op, path, data = get_op_and_json(output)
jsout = json.loads(data)
result = json_cmp(jsout, expected)
assert result is None
finally:
p.kill()
r1.cmd_raises("ip link set r1-eth0 mtu 1500")
def test_backend_datastore_add_delete(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"].net
check_kernel_32(r1, "11.11.11.11", 1, "")
be_client_path = "/usr/lib/frr/mgmtd_testc"
rc, _, _ = r1.cmd_status(be_client_path + " --help")
if rc:
pytest.skip("No mgmtd_testc")
# Start our BE client in the background
p = r1.popen(
[
be_client_path,
"--timeout=20",
"--log=file:/dev/stderr",
"--notify-count=2",
"--datastore",
"--listen",
"/frr-interface:lib/interface",
]
)
assert waitline(p.stderr, "Got SUBSCR_REPLY success 1", timeout=10)
r1.cmd_raises('vtysh -c "conf t" -c "int foobar"')
try:
assert waitline(
p.stdout,
re.escape('#OP=REPLACE: /frr-interface:lib/interface[name="foobar"]/state'),
timeout=2,
)
r1.cmd_raises('vtysh -c "conf t" -c "no int foobar"')
assert waitline(
p.stdout,
re.escape('#OP=DELETE: /frr-interface:lib/interface[name="foobar"]/state'),
timeout=2,
)
finally:
p.kill()
r1.cmd_raises('vtysh -c "conf t" -c "no int foobar"')
def test_datastore_backend_filters(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"].net
check_kernel_32(r1, "11.11.11.11", 1, "")
rc, _, _ = r1.cmd_status(FE_CLIENT + " --help")
if rc:
pytest.skip("No protoc or present cannot run test")
# Start our FE client in the background
p = r1.popen(
[FE_CLIENT, "--datastore", "--listen=/frr-interface:lib/interface/state"]
)
assert waitline(p.stderr, "Connected", timeout=10)
time.sleep(1)
try:
output = r1.cmd_raises(
'vtysh -c "show mgmt get-data /frr-backend:clients/client/state/notify-selectors"'
)
jsout = json.loads(output)
#
# Verify only zebra has the notify selector as it's the only provider currently
#
state = {"notify-selectors": ["/frr-interface:lib/interface/state"]}
expected = {
"frr-backend:clients": {"client": [{"name": "zebra", "state": state}]}
}
result = json_cmp(jsout, expected, exact=True)
assert result is None
except Exception as error:
logging.error("got exception: %s", error)
raise
finally:
p.kill()

View file

@ -5,17 +5,13 @@
#
# Copyright (c) 2024, LabN Consulting, L.L.C.
#
"""
Test YANG Notifications
Test Traditional YANG Notifications
"""
import json
import logging
import os
import re
import pytest
from lib.micronet import Timeout, comm_error
from lib.topogen import Topogen
from lib.topotest import json_cmp
from oper import check_kernel_32
@ -45,99 +41,6 @@ def tgen(request):
tgen.stop_topology()
def myreadline(f):
buf = ""
while True:
# logging.debug("READING 1 CHAR")
c = f.read(1)
if not c:
return buf if buf else None
buf += c
# logging.debug("READ CHAR: '%s'", c)
if c == "\n":
return buf
def _wait_output(f, regex, maxwait=120):
timeout = Timeout(maxwait)
while not timeout.is_expired():
# line = p.stdout.readline()
line = myreadline(f)
if not line:
assert None, "EOF waiting for '{}'".format(regex)
line = line.rstrip()
if line:
logging.debug("GOT LINE: '%s'", line)
m = re.search(regex, line)
if m:
return m
assert None, "Failed to get output matching '{}' withint {} actual {}s".format(
regex, maxwait, timeout.elapsed()
)
def get_op_and_json(output):
op = ""
path = ""
data = ""
for line in output.split("\n"):
if not line:
continue
if not op:
m = re.match("#OP=([A-Z]*): (.*)", line)
if m:
op = m.group(1)
path = m.group(2)
continue
data += line + "\n"
if not op:
assert False, f"No notifcation op present in:\n{output}"
return op, path, data
def test_frontend_datastore_notification(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"].net
check_kernel_32(r1, "11.11.11.11", 1, "")
fe_client_path = CWD + "/../lib/fe_client.py"
rc, _, _ = r1.cmd_status(fe_client_path + " --help")
if rc:
pytest.skip("No protoc or present cannot run test")
# Start our FE client in the background
p = r1.popen(
[fe_client_path, "--datastore", "--listen=/frr-interface:lib/interface"]
)
_wait_output(p.stderr, "Connected", maxwait=10)
r1.cmd_raises("ip link set r1-eth0 mtu 1200")
# {"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"if-index":2,"mtu":1200,"mtu6":1200,"speed":10000,"metric":0,"phy-address":"ba:fd:de:b5:8b:90"}}]}}
try:
# Wait for FE client to exit
output, error = p.communicate(timeout=10)
op, path, data = get_op_and_json(output)
assert op == "REPLACE"
assert path.startswith("/frr-interface:lib/interface[name='r1-eth0']/state")
jsout = json.loads(data)
expected = json.loads(
'{"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"mtu":1200}}]}}'
)
result = json_cmp(jsout, expected)
assert result is None
finally:
p.kill()
r1.cmd_raises("ip link set r1-eth0 mtu 1500")
def test_frontend_notification(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
@ -240,7 +143,7 @@ def test_frontend_all_notification(tgen):
r1.cmd_raises("vtysh", stdin=conf)
def test_backend_notification(tgen):
def test_backend_yang_notification(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)