Please refer to doc/developer/rcu.rst for documentation.

Signed-off-by: David Lamparter <equinox@diac24.net>
This commit is contained in:
David Lamparter 2017-04-03 00:51:20 +02:00 committed by David Lamparter
parent 30ef834ab3
commit 3e41733f1b
10 changed files with 996 additions and 2 deletions

View file

@ -8,6 +8,7 @@ Library Facilities (libfrr)
:maxdepth: 2
memtypes
rcu
lists
logging
hooks

269
doc/developer/rcu.rst Normal file
View file

@ -0,0 +1,269 @@
.. highlight:: c
RCU
===
Introduction
------------
RCU (Read-Copy-Update) is, fundamentally, a paradigm of multithreaded
operation (and not a set of APIs.) The core ideas are:
* longer, complicated updates to structures are made only on private,
"invisible" copies. Other threads, when they access the structure, see an
older (but consistent) copy.
* once done, the updated copy is swapped in in a single operation so that
other threads see either the old or the new data but no inconsistent state
between.
* the old instance is only released after making sure that it is impossible
any other thread might still be reading it.
For more information, please search for general or Linux kernel RCU
documentation; there is no way this doc can be comprehensive in explaining the
interactions:
* https://en.wikipedia.org/wiki/Read-copy-update
* https://www.kernel.org/doc/html/latest/kernel-hacking/locking.html#avoiding-locks-read-copy-update
* https://lwn.net/Articles/262464/
* http://www.rdrop.com/users/paulmck/RCU/rclock_OLS.2001.05.01c.pdf
* http://lse.sourceforge.net/locking/rcupdate.html
RCU, the TL;DR
^^^^^^^^^^^^^^
#. data structures are always consistent for reading. That's the "R" part.
#. reading never blocks / takes a lock.
#. rcu_read_lock is not a lock in the traditional sense. Think of it as a
"reservation"; it notes what the *oldest* possible thing the thread might
be seeing is, and which thus can't be deleted yet.
#. you create some object, finish it up, and then publish it.
#. publishing is an ``atomic_*`` call with ``memory_order_release``, which
tells the compiler to make sure prior memory writes have completed before
doing the atomic op.
#. ``ATOMLIST_*`` ``add`` operations do the ``memory_order_release`` for you.
#. you can't touch the object after it is published, except with atomic ops.
#. because you can't touch it, if you want to change it you make a new copy,
work on that, and then publish the new copy. That's the "CU" part.
#. deleting the object is also an atomic op.
#. other threads that started working before you published / deleted an object
might not see the new object / still see the deleted object.
#. because other threads may still see deleted objects, the ``free()`` needs
to be delayed. That's what :c:func:`rcu_free()` is for.
When (not) to use RCU
^^^^^^^^^^^^^^^^^^^^^
RCU is designed for read-heavy workloads where objects are updated relatively
rarely, but frequently accessed. Do *not* indiscriminately replace locking by
RCU patterns.
The "copy" part of RCU implies that, while updating, several copies of a given
object exist in parallel. Even after the updated copy is swapped in, the old
object remains queued for freeing until all other threads are guaranteed to
not be accessing it anymore, due to passing a sequence point. In addition to
the increased memory usage, there may be some bursted (due to batching) malloc
contention when the RCU cleanup thread does its thing and frees memory.
Other useful patterns
^^^^^^^^^^^^^^^^^^^^^
In addition to the full "copy object, apply changes, atomically update"
approach, there are 2 "reduced" usage cases that can be done:
* atomically updating single pieces of a particular object, e.g. some flags
or configuration piece
* straight up read-only / immutable objects
Both of these cases can be considered RCU "subsets". For example, when
maintaining an atomic list of items, but these items only have a single
integer value that needs to be updated, that value can be atomically updated
without copying the entire object. However, the object still needs to be
free'd through :c:func:`rcu_free()` since reading/updating and deleting might
be happening concurrently. The same applies for immutable objects; deletion
might still race with reading so they need to be free'd through RCU.
FRR API
-------
Before diving into detail on the provided functions, it is important to note
that the FRR RCU API covers the **cleanup part of RCU, not the read-copy-update
paradigm itself**. These parts are handled by standard C11 atomic operations,
and by extension through the atomic data structures (ATOMLIST, ATOMSORT & co.)
The ``rcu_*`` functions only make sense in conjunction with these RCU access
patterns. If you're calling the RCU API but not using these, something is
wrong. The other way around is not necessarily true; it is possible to use
atomic ops & datastructures with other types of locking, e.g. rwlocks.
.. c:function:: void rcu_read_lock()
.. c:function:: void rcu_read_unlock()
These functions acquire / release the RCU read-side lock. All access to
RCU-guarded data must be inside a block guarded by these. Any number of
threads may hold the RCU read-side lock at a given point in time, including
both no threads at all and all threads.
The functions implement a depth counter, i.e. can be nested. The nested
calls are cheap, since they only increment/decrement the counter.
Therefore, any place that uses RCU data and doesn't have a guarantee that
the caller holds RCU (e.g. ``lib/`` code) should just have its own
rcu_read_lock/rcu_read_unlock pair.
At the "root" level (e.g. un-nested), these calls can incur the cost of one
syscall (to ``futex()``). That puts them on about the same cost as a
mutex lock/unlock.
The ``thread_master`` code currently always holds RCU everywhere, except
while doing the actual ``poll()`` syscall. This is both an optimization as
well as an "easement" into getting RCU going. The current implementation
contract is that any ``struct thread *`` callback is called with a RCU
holding depth of 1, and that this is owned by the thread so it may (should)
drop and reacquire it when doing some longer-running work.
.. warning::
The RCU read-side lock must be held **continuously** for the entire time
any piece of RCU data is used. This includes any access to RCU data
after the initial ``atomic_load``. If the RCU read-side lock is
released, any RCU-protected pointers as well as the data they refer to
become invalid, as another thread may have called :c:func:`rcu_free` on
them.
.. c:type:: struct rcu_head
.. c:type:: struct rcu_head_close
.. c:type:: struct rcu_action
The ``rcu_head`` structures are small (16-byte) bits that contain the
queueing machinery for the RCU sweeper/cleanup mechanisms.
Any piece of data that is cleaned up by RCU needs to have a matching
``rcu_head`` embedded in it. If there is more than one cleanup operation
to be done (e.g. closing a file descriptor), more than one ``rcu_head`` may
be embedded.
.. warning::
It is not possible to reuse a ``rcu_head``. It is owned by the RCU code
as soon as ``rcu_*`` is called on it.
The ``_close`` variant carries an extra ``int fd`` field to store the fd to
be closed.
To minimize the amount of memory used for ``rcu_head``, details about the
RCU operation to be performed are moved into the ``rcu_action`` structure.
It contains e.g. the MTYPE for :c:func:`rcu_free` calls. The pointer to be
freed is stored as an offset relative to the ``rcu_head``, which means it
must be embedded as a struct field so the offset is constant.
The ``rcu_action`` structure is an implementation detail. Using
``rcu_free`` or ``rcu_close`` will set it up correctly without further
code needed.
The ``rcu_head`` may be put in an union with other data if the other data
is only used during "life" of the data, since the ``rcu_head`` is used only
for the "death" of data. But note that other threads may still be reading
a piece of data while a thread is working to free it.
.. c:function:: void rcu_free(struct memtype *mtype, struct X *ptr, field)
Free a block of memory after RCU has ensured no other thread can be
accessing it anymore. The pointer remains valid for any other thread that
has called :c:func:`rcu_read_lock` before the ``rcu_free`` call.
.. warning::
In some other RCU implementations, the pointer remains valid to the
*calling* thread if it is holding the RCU read-side lock. This is not
the case in FRR, particularly when running single-threaded. Enforcing
this rule also allows static analysis to find use-after-free issues.
``mtype`` is the libfrr ``MTYPE_FOO`` allocation type to pass to
:c:func:`XFREE`.
``field`` must be the name of a ``struct rcu_head`` member field in ``ptr``.
The offset of this field (which must be constant) is used to reduce the
memory size of ``struct rcu_head``.
.. note::
``rcu_free`` (and ``rcu_close``) calls are more efficient if they are
put close to each other. When freeing several RCU'd resources, try to
move the calls next to each other (even if the data structures do not
directly point to each other.)
Having the calls bundled reduces the cost of adding the ``rcu_head`` to
the RCU queue; the RCU queue is an atomic data structure whose usage
will require the CPU to acquire an exclusive hold on relevant cache
lines.
.. c:function:: void rcu_close(struct rcu_head_close *head, int fd)
Close a file descriptor after ensuring no other thread might be using it
anymore. Same as :c:func:`rcu_free`, except it calls ``close`` instead of
``free``.
Internals
^^^^^^^^^
.. c:type:: struct rcu_thread
Per-thread state maintained by the RCU code, set up by the following
functions. A pointer to a thread's own ``rcu_thread`` is saved in
thread-local storage.
.. c:function:: struct rcu_thread *rcu_thread_prepare(void)
.. c:function:: void rcu_thread_unprepare(struct rcu_thread *rcu_thread)
.. c:function:: void rcu_thread_start(struct rcu_thread *rcu_thread)
Since the RCU code needs to have a list of all active threads, these
functions are used by the ``frr_pthread`` code to set up threads. Teardown
is automatic. It should not be necessary to call these functions.
Any thread that accesses RCU-protected data needs to be registered with
these functions. Threads that do not access RCU-protected data may call
these functions but do not need to.
Note that passing a pointer to RCU-protected data to some library which
accesses that pointer makes the library "access RCU-protected data". In
that case, either all of the library's threads must be registered for RCU,
or the code must instead pass a (non-RCU) copy of the data to the library.
.. c:function:: void rcu_shutdown(void)
Stop the RCU sweeper thread and make sure all cleanup has finished.
This function is called on daemon exit by the libfrr code to ensure pending
RCU operations are completed. This is mostly to get a clean exit without
memory leaks from queued RCU operations. It should not be necessary to
call this function as libfrr handles this.
FRR specifics and implementation details
----------------------------------------
The FRR RCU infrastructure has the following characteristics:
* it is Epoch-based with a 32-bit wrapping counter. (This is somewhat
different from other Epoch-based approaches which may be designed to only
use 3 counter values, but works out to a simple implementation.)
* instead of tracking CPUs as the Linux kernel does, threads are tracked. This
has exactly zero semantic impact, RCU just cares about "threads of
execution", which the kernel can optimize to CPUs but we can't. But it
really boils down to the same thing.
* there are no ``rcu_dereference`` and ``rcu_assign_pointer`` - use
``atomic_load`` and ``atomic_store`` instead. (These didn't exist when the
Linux RCU code was created.)
* there is no ``synchronize_rcu``; this is a design choice but may be revisited
at a later point. ``synchronize_rcu`` blocks a thread until it is guaranteed
that no other threads might still be accessing data structures that they may
have access to at the beginning of the function call. This is a blocking
design and probably not appropriate for FRR. Instead, ``rcu_call`` can be
used to have the RCU sweeper thread make a callback after the same constraint
is fulfilled in an asynchronous way. Most needs should be covered by
``rcu_free`` and ``rcu_close``.

View file

@ -42,6 +42,7 @@ dev_RSTFILES = \
doc/developer/packaging-debian.rst \
doc/developer/packaging-redhat.rst \
doc/developer/packaging.rst \
doc/developer/rcu.rst \
doc/developer/testing.rst \
doc/developer/topotests-snippets.rst \
doc/developer/topotests.rst \

View file

@ -133,18 +133,29 @@ int frr_pthread_set_name(struct frr_pthread *fpt)
return ret;
}
static void *frr_pthread_inner(void *arg)
{
struct frr_pthread *fpt = arg;
rcu_thread_start(fpt->rcu_thread);
return fpt->attr.start(fpt);
}
int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr)
{
int ret;
ret = pthread_create(&fpt->thread, attr, fpt->attr.start, fpt);
fpt->rcu_thread = rcu_thread_prepare();
ret = pthread_create(&fpt->thread, attr, frr_pthread_inner, fpt);
/*
* Per pthread_create(3), the contents of fpt->thread are undefined if
* pthread_create() did not succeed. Reset this value to zero.
*/
if (ret < 0)
if (ret < 0) {
rcu_thread_unprepare(fpt->rcu_thread);
memset(&fpt->thread, 0x00, sizeof(fpt->thread));
}
return ret;
}

View file

@ -23,6 +23,7 @@
#include <pthread.h>
#include "frratomic.h"
#include "memory.h"
#include "frrcu.h"
#include "thread.h"
#ifdef __cplusplus
@ -50,6 +51,8 @@ struct frr_pthread {
/* pthread id */
pthread_t thread;
struct rcu_thread *rcu_thread;
/* thread master for this pthread's thread.c event loop */
struct thread_master *master;

527
lib/frrcu.c Normal file
View file

@ -0,0 +1,527 @@
/*
* Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/* implementation notes: this is an epoch-based RCU implementation. rcu_seq
* (global variable) counts the current epoch. Threads hold a specific epoch
* in rcu_read_lock(). This is the oldest epoch a thread might be accessing
* data from.
*
* The rcu_seq global is only pushed forward on rcu_read_lock() and
* rcu_read_unlock() calls. This makes things a tad more efficient since
* those are the only places it matters:
* - on rcu_read_lock, we don't want to hold an old epoch pointlessly
* - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
* when heading into a long idle period where no thread holds RCU
*
* rcu_thread structures themselves are RCU-free'd.
*
* rcu_head structures are the most iffy; normally for an ATOMLIST we would
* need to make sure we use rcu_free or pthread_rwlock to deallocate old items
* to prevent ABA or use-after-free problems. However, our ATOMLIST code
* guarantees that if the list remains non-empty in all cases, we only need
* the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
* issues - but we do need to keep at least 1 item on the list.
*
* (Search the atomlist code for all uses of "last")
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <pthread.h>
#ifdef HAVE_PTHREAD_NP_H
#include <pthread_np.h>
#endif
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include "frrcu.h"
#include "seqlock.h"
#include "atomlist.h"
DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread")
DEFINE_MTYPE_STATIC(LIB, RCU_NEXT, "RCU sequence barrier")
DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head)
PREDECL_ATOMLIST(rcu_threads)
struct rcu_thread {
struct rcu_threads_item head;
struct rcu_head rcu_head;
struct seqlock rcu;
/* only accessed by thread itself, not atomic */
unsigned depth;
};
DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head)
static const struct rcu_action rcua_next = { .type = RCUA_NEXT };
static const struct rcu_action rcua_end = { .type = RCUA_END };
static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
struct rcu_next {
struct rcu_head head_free;
struct rcu_head head_next;
};
#define rcu_free_internal(mtype, ptr, field) \
do { \
typeof(ptr) _ptr = (ptr); \
struct rcu_head *_rcu_head = &_ptr->field; \
static const struct rcu_action _rcu_action = { \
.type = RCUA_FREE, \
.u.free = { \
.mt = mtype, \
.offset = offsetof(typeof(*_ptr), field), \
}, \
}; \
_rcu_head->action = &_rcu_action; \
rcu_heads_add_tail(&rcu_heads, _rcu_head); \
} while (0)
/* primary global RCU position */
static struct seqlock rcu_seq;
/* this is set to rcu_seq whenever something is added on the RCU queue.
* rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
*/
static _Atomic seqlock_val_t rcu_dirty;
static struct rcu_threads_head rcu_threads;
static struct rcu_heads_head rcu_heads;
/* main thread & RCU sweeper have pre-setup rcu_thread structures. The
* reasons are different:
*
* - rcu_thread_main is there because the main thread isn't started like
* other threads, it's implicitly created when the program is started. So
* rcu_thread_main matches up implicitly.
*
* - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
* sense really), it only exists so we can call RCU-using functions from
* the RCU thread without special handling in rcu_read_lock/unlock.
*/
static struct rcu_thread rcu_thread_main;
static struct rcu_thread rcu_thread_rcu;
static pthread_t rcu_pthread;
static pthread_key_t rcu_thread_key;
static bool rcu_active;
static void rcu_start(void);
static void rcu_bump(void);
/*
* preinitialization for main thread
*/
static void rcu_thread_end(void *rcu_thread);
static void rcu_preinit(void) __attribute__((constructor));
static void rcu_preinit(void)
{
struct rcu_thread *rt;
rt = &rcu_thread_main;
rt->depth = 1;
seqlock_init(&rt->rcu);
seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
pthread_key_create(&rcu_thread_key, rcu_thread_end);
pthread_setspecific(rcu_thread_key, rt);
rcu_threads_add_tail(&rcu_threads, rt);
/* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
rt = &rcu_thread_rcu;
rt->depth = 1;
seqlock_init(&rcu_seq);
seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
}
static struct rcu_thread *rcu_self(void)
{
return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
}
/*
* thread management (for the non-main thread)
*/
struct rcu_thread *rcu_thread_prepare(void)
{
struct rcu_thread *rt, *cur;
rcu_assert_read_locked();
if (!rcu_active)
rcu_start();
cur = rcu_self();
assert(cur->depth);
/* new thread always starts with rcu_read_lock held at depth 1, and
* holding the same epoch as the parent (this makes it possible to
* use RCU for things passed into the thread through its arg)
*/
rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
rt->depth = 1;
seqlock_init(&rt->rcu);
seqlock_acquire(&rt->rcu, &cur->rcu);
rcu_threads_add_tail(&rcu_threads, rt);
return rt;
}
void rcu_thread_start(struct rcu_thread *rt)
{
pthread_setspecific(rcu_thread_key, rt);
}
void rcu_thread_unprepare(struct rcu_thread *rt)
{
if (rt == &rcu_thread_rcu)
return;
rt->depth = 1;
seqlock_acquire(&rt->rcu, &rcu_seq);
rcu_bump();
if (rt != &rcu_thread_main)
/* this free() happens after seqlock_release() below */
rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
rcu_threads_del(&rcu_threads, rt);
seqlock_release(&rt->rcu);
}
static void rcu_thread_end(void *rtvoid)
{
struct rcu_thread *rt = rtvoid;
rcu_thread_unprepare(rt);
}
/*
* main RCU control aspects
*/
static void rcu_bump(void)
{
struct rcu_next *rn;
rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn));
/* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
* This means we don't need to communicate which seqno is which
* RCUA_NEXT, since we really don't care.
*/
/*
* Important race condition: while rcu_heads_add_tail is executing,
* there is an intermediate point where the rcu_heads "last" pointer
* already points to rn->head_next, but rn->head_next isn't added to
* the list yet. That means any other "add_tail" calls append to this
* item, which isn't fully on the list yet. Freeze this thread at
* that point and look at another thread doing a rcu_bump. It adds
* these two items and then does a seqlock_bump. But the rcu_heads
* list is still "interrupted" and there's no RCUA_NEXT on the list
* yet (from either the frozen thread or the second thread). So
* rcu_main() might actually hit the end of the list at the
* "interrupt".
*
* This situation is prevented by requiring that rcu_read_lock is held
* for any calls to rcu_bump, since if we're holding the current RCU
* epoch, that means rcu_main can't be chewing on rcu_heads and hit
* that interruption point. Only by the time the thread has continued
* to rcu_read_unlock() - and therefore completed the add_tail - the
* RCU sweeper gobbles up the epoch and can be sure to find at least
* the RCUA_NEXT and RCUA_FREE items on rcu_heads.
*/
rn->head_next.action = &rcua_next;
rcu_heads_add_tail(&rcu_heads, &rn->head_next);
/* free rn that we allocated above.
*
* This is INTENTIONALLY not built into the RCUA_NEXT action. This
* ensures that after the action above is popped off the queue, there
* is still at least 1 item on the RCU queue. This means we never
* delete the last item, which is extremely important since it keeps
* the atomlist ->last pointer alive and well.
*
* If we were to "run dry" on the RCU queue, add_tail may run into the
* "last item is being deleted - start over" case, and then we may end
* up accessing old RCU queue items that are already free'd.
*/
rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free);
/* Only allow the RCU sweeper to run after these 2 items are queued.
*
* If another thread enqueues some RCU action in the intermediate
* window here, nothing bad happens - the queued action is associated
* with a larger seq# than strictly necessary. Thus, it might get
* executed a bit later, but that's not a problem.
*
* If another thread acquires the read lock in this window, it holds
* the previous epoch, but its RCU queue actions will be in the next
* epoch. This isn't a problem either, just a tad inefficient.
*/
seqlock_bump(&rcu_seq);
}
static void rcu_bump_maybe(void)
{
seqlock_val_t dirty;
dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
/* no problem if we race here and multiple threads bump rcu_seq;
* bumping too much causes no issues while not bumping enough will
* result in delayed cleanup
*/
if (dirty == seqlock_cur(&rcu_seq))
rcu_bump();
}
void rcu_read_lock(void)
{
struct rcu_thread *rt = rcu_self();
assert(rt);
if (rt->depth++ > 0)
return;
seqlock_acquire(&rt->rcu, &rcu_seq);
/* need to hold RCU for bump ... */
rcu_bump_maybe();
/* ... but no point in holding the old epoch if we just bumped */
seqlock_acquire(&rt->rcu, &rcu_seq);
}
void rcu_read_unlock(void)
{
struct rcu_thread *rt = rcu_self();
assert(rt && rt->depth);
if (--rt->depth > 0)
return;
rcu_bump_maybe();
seqlock_release(&rt->rcu);
}
void rcu_assert_read_locked(void)
{
struct rcu_thread *rt = rcu_self();
assert(rt && rt->depth && seqlock_held(&rt->rcu));
}
void rcu_assert_read_unlocked(void)
{
struct rcu_thread *rt = rcu_self();
assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
}
/*
* RCU resource-release thread
*/
static void *rcu_main(void *arg);
static void rcu_start(void)
{
/* ensure we never handle signals on the RCU thread by blocking
* everything here (new thread inherits signal mask)
*/
sigset_t oldsigs, blocksigs;
sigfillset(&blocksigs);
pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
rcu_active = true;
assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#ifdef HAVE_PTHREAD_SETNAME_NP
# ifdef GNU_LINUX
pthread_setname_np(rcu_pthread, "RCU sweeper");
# elif defined(__NetBSD__)
pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
# endif
#elif defined(HAVE_PTHREAD_SET_NAME_NP)
pthread_set_name_np(rcu_pthread, "RCU sweeper");
#endif
}
static void rcu_do(struct rcu_head *rh)
{
struct rcu_head_close *rhc;
void *p;
switch (rh->action->type) {
case RCUA_FREE:
p = (char *)rh - rh->action->u.free.offset;
if (rh->action->u.free.mt)
qfree(rh->action->u.free.mt, p);
else
free(p);
break;
case RCUA_CLOSE:
rhc = container_of(rh, struct rcu_head_close,
rcu_head);
close(rhc->fd);
break;
case RCUA_CALL:
p = (char *)rh - rh->action->u.call.offset;
rh->action->u.call.fptr(p);
break;
case RCUA_INVALID:
case RCUA_NEXT:
case RCUA_END:
default:
assert(0);
}
}
static void rcu_watchdog(struct rcu_thread *rt)
{
#if 0
/* future work: print a backtrace for the thread that's holding up
* RCU. The only (good) way of doing that is to send a signal to the
* other thread, save away the backtrace in the signal handler, and
* block here until the signal is done processing.
*
* Just haven't implemented that yet.
*/
fprintf(stderr, "RCU watchdog %p\n", rt);
#endif
}
static void *rcu_main(void *arg)
{
struct rcu_thread *rt;
struct rcu_head *rh = NULL;
bool end = false;
struct timespec maxwait;
seqlock_val_t rcuval = SEQLOCK_STARTVAL;
pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
while (!end) {
seqlock_wait(&rcu_seq, rcuval);
/* RCU watchdog timeout, TODO: configurable value */
clock_gettime(CLOCK_MONOTONIC, &maxwait);
maxwait.tv_nsec += 100 * 1000 * 1000;
if (maxwait.tv_nsec >= 1000000000) {
maxwait.tv_sec++;
maxwait.tv_nsec -= 1000000000;
}
frr_each (rcu_threads, &rcu_threads, rt)
if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
rcu_watchdog(rt);
seqlock_wait(&rt->rcu, rcuval);
}
while ((rh = rcu_heads_pop(&rcu_heads))) {
if (rh->action->type == RCUA_NEXT)
break;
else if (rh->action->type == RCUA_END)
end = true;
else
rcu_do(rh);
}
rcuval += SEQLOCK_INCR;
}
/* rcu_shutdown can only be called singlethreaded, and it does a
* pthread_join, so it should be impossible that anything ended up
* on the queue after RCUA_END
*/
#if 1
assert(!rcu_heads_first(&rcu_heads));
#else
while ((rh = rcu_heads_pop(&rcu_heads)))
if (rh->action->type >= RCUA_FREE)
rcu_do(rh);
#endif
return NULL;
}
void rcu_shutdown(void)
{
static struct rcu_head rcu_head_end;
struct rcu_thread *rt = rcu_self();
void *retval;
if (!rcu_active)
return;
rcu_assert_read_locked();
assert(rcu_threads_count(&rcu_threads) == 1);
rcu_enqueue(&rcu_head_end, &rcua_end);
rt->depth = 0;
seqlock_release(&rt->rcu);
seqlock_release(&rcu_seq);
rcu_active = false;
/* clearing rcu_active is before pthread_join in case we hang in
* pthread_join & get a SIGTERM or something - in that case, just
* ignore the maybe-still-running RCU thread
*/
if (pthread_join(rcu_pthread, &retval) == 0) {
seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
rt->depth = 1;
}
}
/*
* RCU'd free functions
*/
void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
{
/* refer to rcu_bump() for why we need to hold RCU when adding items
* to rcu_heads
*/
rcu_assert_read_locked();
rh->action = action;
if (!rcu_active) {
rcu_do(rh);
return;
}
rcu_heads_add_tail(&rcu_heads, rh);
atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
memory_order_relaxed);
}
void rcu_close(struct rcu_head_close *rhc, int fd)
{
rhc->fd = fd;
rcu_enqueue(&rhc->rcu_head, &rcua_close);
}

172
lib/frrcu.h Normal file
View file

@ -0,0 +1,172 @@
/*
* Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#ifndef _FRRCU_H
#define _FRRCU_H
#include "memory.h"
#include "atomlist.h"
#include "seqlock.h"
/* quick RCU primer:
* There's a global sequence counter. Whenever a thread does a
* rcu_read_lock(), it is marked as holding the current sequence counter.
* When something is cleaned with RCU, the global sequence counter is
* increased and the item is queued for cleanup - *after* all threads are
* at a more recent sequence counter (or no sequence counter / unheld).
*
* So, by delaying resource cleanup, RCU ensures that things don't go away
* while another thread may hold a (stale) reference.
*
* Note that even if a thread is in rcu_read_lock(), it is invalid for that
* thread to access bits after rcu_free() & co on them. This is a design
* choice to allow no-op'ing out the entire RCU mechanism if we're running
* singlethreaded. (Also allows some optimization on the counter bumping.)
*
* differences from Linux Kernel RCU:
* - there's no rcu_synchronize(), if you really need to defer something
* use rcu_call() (and double check it's really necessary)
* - rcu_dereference() and rcu_assign_pointer() don't exist, use atomic_*
* instead (ATOM* list structures do the right thing)
*/
/* opaque */
struct rcu_thread;
/* called before new thread creation, sets up rcu thread info for new thread
* before it actually exits. This ensures possible RCU references are held
* for thread startup.
*
* return value must be passed into the new thread's call to rcu_thread_start()
*/
extern struct rcu_thread *rcu_thread_prepare(void);
/* cleanup in case pthread_create() fails */
extern void rcu_thread_unprepare(struct rcu_thread *rcu_thread);
/* called early in the new thread, with the return value from the above.
* NB: new thread is initially in RCU-held state! (at depth 1)
*
* TBD: maybe inherit RCU state from rcu_thread_prepare()?
*/
extern void rcu_thread_start(struct rcu_thread *rcu_thread);
/* thread exit is handled through pthread_key_create's destructor function */
/* global RCU shutdown - must be called with only 1 active thread left. waits
* until remaining RCU actions are done & RCU thread has exited.
*
* This is mostly here to get a clean exit without memleaks.
*/
extern void rcu_shutdown(void);
/* enter / exit RCU-held state. counter-based, so can be called nested. */
extern void rcu_read_lock(void);
extern void rcu_read_unlock(void);
/* for debugging / safety checks */
extern void rcu_assert_read_locked(void);
extern void rcu_assert_read_unlocked(void);
enum rcu_action_type {
RCUA_INVALID = 0,
/* used internally by the RCU code, shouldn't ever show up outside */
RCUA_NEXT,
RCUA_END,
/* normal RCU actions, for outside use */
RCUA_FREE,
RCUA_CLOSE,
RCUA_CALL,
};
/* since rcu_head is intended to be embedded into structs which may exist
* with lots of copies, rcu_head is shrunk down to its absolute minimum -
* the atomlist pointer + a pointer to this action struct.
*/
struct rcu_action {
enum rcu_action_type type;
union {
struct {
struct memtype *mt;
ptrdiff_t offset;
} free;
struct {
void (*fptr)(void *arg);
ptrdiff_t offset;
} call;
} u;
};
/* RCU cleanup function queue item */
PREDECL_ATOMLIST(rcu_heads)
struct rcu_head {
struct rcu_heads_item head;
const struct rcu_action *action;
};
/* special RCU head for delayed fd-close */
struct rcu_head_close {
struct rcu_head rcu_head;
int fd;
};
/* enqueue RCU action - use the macros below to get the rcu_action set up */
extern void rcu_enqueue(struct rcu_head *head, const struct rcu_action *action);
/* RCU free() and file close() operations.
*
* freed memory / closed fds become _immediately_ unavailable to the calling
* thread, but will remain available for other threads until they have passed
* into RCU-released state.
*/
/* may be called with NULL mt to do non-MTYPE free() */
#define rcu_free(mtype, ptr, field) \
do { \
typeof(ptr) _ptr = (ptr); \
struct rcu_head *_rcu_head = &_ptr->field; \
static const struct rcu_action _rcu_action = { \
.type = RCUA_FREE, \
.u.free = { \
.mt = mtype, \
.offset = offsetof(typeof(*_ptr), field), \
}, \
}; \
rcu_enqueue(_rcu_head, &_rcu_action); \
} while (0)
/* use this sparingly, it runs on (and blocks) the RCU thread */
#define rcu_call(func, ptr, field) \
do { \
typeof(ptr) _ptr = (ptr); \
void (*fptype)(typeof(ptr)); \
struct rcu_head *_rcu_head = &_ptr->field; \
static const struct rcu_action _rcu_action = { \
.type = RCUA_CALL, \
.u.call = { \
.fptr = (void *)func, \
.offset = offsetof(typeof(*_ptr), field), \
}, \
}; \
(void)(_fptype = func); \
rcu_enqueue(_rcu_head, &_rcu_action); \
} while (0)
extern void rcu_close(struct rcu_head_close *head, int fd);
#endif /* _FRRCU_H */

View file

@ -41,6 +41,7 @@
#include "northbound_cli.h"
#include "northbound_db.h"
#include "debug.h"
#include "frrcu.h"
DEFINE_HOOK(frr_late_init, (struct thread_master * tm), (tm))
DEFINE_KOOH(frr_early_fini, (), ())
@ -1081,6 +1082,7 @@ void frr_fini(void)
master = NULL;
closezlog();
/* frrmod_init -> nothing needed / hooks */
rcu_shutdown();
if (!debug_memstats_at_exit)
return;

View file

@ -21,6 +21,7 @@ lib_libfrr_la_SOURCES = \
lib/distribute.c \
lib/ferr.c \
lib/filter.c \
lib/frrcu.c \
lib/frrlua.c \
lib/frr_pthread.c \
lib/frrstr.c \
@ -163,6 +164,7 @@ pkginclude_HEADERS += \
lib/frrlua.h \
lib/frr_pthread.h \
lib/frratomic.h \
lib/frrcu.h \
lib/frrstr.h \
lib/getopt.h \
lib/graph.h \

View file

@ -25,6 +25,7 @@
#include "thread.h"
#include "memory.h"
#include "frrcu.h"
#include "log.h"
#include "hash.h"
#include "pqueue.h"
@ -753,6 +754,9 @@ static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
< 0) // effect a poll (return immediately)
timeout = 0;
rcu_read_unlock();
rcu_assert_read_unlocked();
/* add poll pipe poker */
assert(count + 1 < pfdsize);
pfds[count].fd = m->io_pipe[0];
@ -766,6 +770,8 @@ static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
;
rcu_read_lock();
return num;
}