lib: nb: notification add locking support for multi-threading

Signed-off-by: Christian Hopps <chopps@labn.net>
This commit is contained in:
Christian Hopps 2025-02-25 21:19:36 +00:00
parent 915dcd220d
commit bd68a01d9a
3 changed files with 65 additions and 12 deletions

View file

@ -708,15 +708,16 @@ struct frr_yang_module_info {
* this function should return that tree (locked if multi-threading).
* If this function is provided then the state callback functions
* (get_elem, get_keys, get_next, lookup_entry) need not be set for a
* module.
* module. The unlock_tree function if non-NULL will be called with
* the returned tree and the *user_lock value.
*/
const struct lyd_node *(*get_tree_locked)(const char *xpath);
const struct lyd_node *(*get_tree_locked)(const char *xpath, void **user_lock);
/*
* This function will be called following a call to get_tree_locked() in
* order to unlock the tree if locking was required.
*/
void (*unlock_tree)(const struct lyd_node *tree);
void (*unlock_tree)(const struct lyd_node *tree, void *user_lock);
/* Northbound callbacks. */
const struct {
@ -1853,6 +1854,18 @@ extern struct lyd_node *nb_op_updatef(struct lyd_node *tree, const char *path, c
extern struct lyd_node *nb_op_vupdatef(struct lyd_node *tree, const char *path, const char *val_fmt,
va_list ap);
/**
* nb_notif_add() - Notice that the value at `path` has changed.
* @path - Absolute path in the state tree that has changed (either added or
* updated).
*/
void nb_notif_add(const char *path);
/**
* nb_notif_delete() - Notice that the value at `path` has been deleted.
* @path - Absolute path in the state tree that has been deleted.
*/
void nb_notif_delete(const char *path);
/**
* nb_notif_set_filters() - add or replace notification filters
@ -1863,6 +1876,15 @@ extern struct lyd_node *nb_op_vupdatef(struct lyd_node *tree, const char *path,
*/
extern void nb_notif_set_filters(const char **selectors, bool replace);
/**
* nb_notif_enable_multi_thread() - enable use of multiple threads with nb_notif
*
* If the nb_notif_XXX calls will be made from multiple threads then locking is
* required. Call this function to enable that functionality, prior to using the
* nb_notif_XXX API.
*/
extern void nb_notif_enable_multi_thread(void);
extern void nb_notif_init(struct event_loop *loop);
extern void nb_notif_terminate(void);

View file

@ -58,6 +58,9 @@ RB_HEAD(op_changes, op_change);
RB_PROTOTYPE(op_changes, op_change, link, op_change_cmp)
RB_GENERATE(op_changes, op_change, link, op_change_cmp)
pthread_mutex_t _nb_notif_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t *nb_notif_lock;
struct op_changes nb_notif_adds = RB_INITIALIZER(&nb_notif_adds);
struct op_changes nb_notif_dels = RB_INITIALIZER(&nb_notif_dels);
struct event_loop *nb_notif_master;
@ -158,12 +161,14 @@ static void op_change_free(struct op_change *note)
}
/**
* op_changes_group_push() - Save the current set of changes on the queue.
* __op_changes_group_push() - Save the current set of changes on the queue.
*
* This function will save the current set of changes on the queue and
* initialize a new set of changes.
*
* The lock must be held during this call.
*/
static void op_changes_group_push(void)
static void __op_changes_group_push(void)
{
struct op_changes_group *changes;
@ -312,15 +317,27 @@ static void __op_change_add_del(const char *path, struct op_changes *this_head,
nb_notif_set_walk_timer();
}
static void nb_notif_add(const char *path)
void nb_notif_add(const char *path)
{
if (nb_notif_lock)
pthread_mutex_lock(nb_notif_lock);
__op_change_add_del(path, &nb_notif_adds, &nb_notif_dels);
if (nb_notif_lock)
pthread_mutex_unlock(nb_notif_lock);
}
static void nb_notif_delete(const char *path)
void nb_notif_delete(const char *path)
{
if (nb_notif_lock)
pthread_mutex_lock(nb_notif_lock);
__op_change_add_del(path, &nb_notif_dels, &nb_notif_adds);
if (nb_notif_lock)
pthread_mutex_unlock(nb_notif_lock);
}
@ -464,13 +481,21 @@ static struct op_changes_group *op_changes_group_next(void)
{
struct op_changes_group *group;
if (nb_notif_lock)
pthread_mutex_lock(nb_notif_lock);
group = op_changes_queue_pop(&op_changes_queue);
if (!group) {
op_changes_group_push();
__op_changes_group_push();
group = op_changes_queue_pop(&op_changes_queue);
}
if (nb_notif_lock)
pthread_mutex_unlock(nb_notif_lock);
if (!group)
return NULL;
group->cur_changes = &group->dels;
group->cur_change = RB_MIN(op_changes, group->cur_changes);
if (!group->cur_change) {
@ -679,6 +704,11 @@ void nb_notif_set_filters(const char **selectors, bool replace)
darr_free(selectors);
}
void nb_notif_enable_multi_thread(void)
{
nb_notif_lock = &_nb_notif_lock;
}
void nb_notif_init(struct event_loop *tm)
{
nb_notif_master = tm;

View file

@ -48,8 +48,8 @@ DEFINE_MTYPE_STATIC(LIB, NB_NODE_INFOS, "NB Node Infos");
/* ---------- */
PREDECL_LIST(nb_op_walks);
typedef const struct lyd_node *(*get_tree_locked_cb)(const char *xpath);
typedef void (*unlock_tree_cb)(const struct lyd_node *tree);
typedef const struct lyd_node *(*get_tree_locked_cb)(const char *xpath, void **user_tree_lock);
typedef void (*unlock_tree_cb)(const struct lyd_node *tree, void *user_tree_lock);
/*
* This is our information about a node on the branch we are looking at
@ -102,6 +102,7 @@ struct nb_op_yield_state {
/* For now we support a single use of this. */
const struct lyd_node *user_tree;
void *user_tree_lock;
unlock_tree_cb user_tree_unlock;
/* Yielding state */
@ -177,7 +178,7 @@ static inline void nb_op_free_yield_state(struct nb_op_yield_state *ys,
{
if (ys) {
if (ys->user_tree && ys->user_tree_unlock)
ys->user_tree_unlock(ys->user_tree);
ys->user_tree_unlock(ys->user_tree, ys->user_tree_lock);
EVENT_OFF(ys->walk_ev);
nb_op_walks_del(&nb_op_walks, ys);
/* if we have a branch then free up it's libyang tree */
@ -674,7 +675,7 @@ static const struct lyd_node *__get_tree(struct nb_op_yield_state *ys,
get_tree_cb = __get_get_tree_funcs(__module_name(nb_node), &ys->user_tree_unlock);
assert(get_tree_cb);
ys->user_tree = get_tree_cb(xpath);
ys->user_tree = get_tree_cb(xpath, &ys->user_tree_lock);
return ys->user_tree;
}