forked from Mirror/frr
2005-05-23 Paul Jakma <paul@dishone.st>
* workqueue.h: Add a WQ_QUEUE_BLOCKED item_status return code, to allow a queue function to indicate the queue is not ready/blocked - rather than any problem with the item at hand. Add a notion of being able to 'plug' and 'unplug' a queue. Add helpers to plug/unplug a queue. Add a completion callback, to be called when a queue is emptied. * workqueue.c: (work_queue_new) remove useless list_free. (work_queue_schedule) new internal helper function to schedule queue, if appropriate. (work_queue_add) use work_queue_schedule (show_work_queues) Print 'P' if queue is plugged. (work_queue_plug) new API function, plug a queue - ie prevent it from 'drained' / processed / scheduled. (work_queue_unplug) unplug a queue, allowing it to be drained / scheduled / processed again. (work_queue_run) Add support for WQ_QUEUE_BLOCKED. Add comment for RETRY_NOW case. Make hysteris more aggresive in ramping up granularity, improves performance significantly. Add support for calling completion callback when queue is emptied, possibly useful for knowing when to unplug a queue.
This commit is contained in:
parent
a94feb380d
commit
269d74fdc3
|
@ -9,6 +9,27 @@
|
||||||
from VTY_GET_INTEGER_RANGE
|
from VTY_GET_INTEGER_RANGE
|
||||||
* vty.h: fix the VTY_GET macros, do {..} while(0) so they have
|
* vty.h: fix the VTY_GET macros, do {..} while(0) so they have
|
||||||
correct function like syntax in usage.
|
correct function like syntax in usage.
|
||||||
|
* workqueue.h: Add a WQ_QUEUE_BLOCKED item_status return code,
|
||||||
|
to allow a queue function to indicate the queue is not
|
||||||
|
ready/blocked - rather than any problem with the item at hand.
|
||||||
|
Add a notion of being able to 'plug' and 'unplug' a queue.
|
||||||
|
Add helpers to plug/unplug a queue.
|
||||||
|
Add a completion callback, to be called when a queue is emptied.
|
||||||
|
* workqueue.c: (work_queue_new) remove useless list_free.
|
||||||
|
(work_queue_schedule) new internal helper function to schedule
|
||||||
|
queue, if appropriate.
|
||||||
|
(work_queue_add) use work_queue_schedule
|
||||||
|
(show_work_queues) Print 'P' if queue is plugged.
|
||||||
|
(work_queue_plug) new API function, plug a queue - ie prevent it
|
||||||
|
from 'drained' / processed / scheduled.
|
||||||
|
(work_queue_unplug) unplug a queue, allowing it to be drained
|
||||||
|
/ scheduled / processed again.
|
||||||
|
(work_queue_run) Add support for WQ_QUEUE_BLOCKED.
|
||||||
|
Add comment for RETRY_NOW case.
|
||||||
|
Make hysteris more aggresive in ramping up granularity, improves
|
||||||
|
performance significantly.
|
||||||
|
Add support for calling completion callback when queue is emptied,
|
||||||
|
possibly useful for knowing when to unplug a queue.
|
||||||
|
|
||||||
2005-05-19 Paul Jakma <paul@dishone.st>
|
2005-05-19 Paul Jakma <paul@dishone.st>
|
||||||
|
|
||||||
|
|
|
@ -69,9 +69,6 @@ work_queue_new (struct thread_master *m, const char *queue_name)
|
||||||
|
|
||||||
if ( (new->items = list_new ()) == NULL)
|
if ( (new->items = list_new ()) == NULL)
|
||||||
{
|
{
|
||||||
if (new->items)
|
|
||||||
list_free (new->items);
|
|
||||||
|
|
||||||
XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
|
XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
|
||||||
XFREE (MTYPE_WORK_QUEUE, new);
|
XFREE (MTYPE_WORK_QUEUE, new);
|
||||||
|
|
||||||
|
@ -99,6 +96,22 @@ work_queue_free (struct work_queue *wq)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int
|
||||||
|
work_queue_schedule (struct work_queue *wq, unsigned int delay)
|
||||||
|
{
|
||||||
|
/* if appropriate, schedule work queue thread */
|
||||||
|
if ( (wq->flags == WQ_UNPLUGGED)
|
||||||
|
&& (wq->thread == NULL)
|
||||||
|
&& (listcount (wq->items) > 0) )
|
||||||
|
{
|
||||||
|
wq->thread = thread_add_background (wq->master, work_queue_run,
|
||||||
|
wq, delay);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
work_queue_add (struct work_queue *wq, void *data)
|
work_queue_add (struct work_queue *wq, void *data)
|
||||||
{
|
{
|
||||||
|
@ -115,12 +128,7 @@ work_queue_add (struct work_queue *wq, void *data)
|
||||||
item->data = data;
|
item->data = data;
|
||||||
listnode_add (wq->items, item);
|
listnode_add (wq->items, item);
|
||||||
|
|
||||||
/* if thread isnt already waiting, add one */
|
work_queue_schedule (wq, wq->spec.hold);
|
||||||
if (wq->thread == NULL)
|
|
||||||
wq->thread = thread_add_background (wq->master, work_queue_run,
|
|
||||||
wq, wq->spec.hold);
|
|
||||||
|
|
||||||
/* XXX: what if we didnt get a thread? try again? */
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -159,11 +167,12 @@ DEFUN(show_work_queues,
|
||||||
struct work_queue *wq;
|
struct work_queue *wq;
|
||||||
|
|
||||||
vty_out (vty,
|
vty_out (vty,
|
||||||
"%8s %11s %8s %21s%s",
|
"%c %8s %11s %8s %21s%s",
|
||||||
"List","(ms) ","Q. Runs","Cycle Counts ",
|
' ', "List","(ms) ","Q. Runs","Cycle Counts ",
|
||||||
VTY_NEWLINE);
|
VTY_NEWLINE);
|
||||||
vty_out (vty,
|
vty_out (vty,
|
||||||
"%8s %5s %5s %8s %7s %6s %6s %s%s",
|
"%c %8s %5s %5s %8s %7s %6s %6s %s%s",
|
||||||
|
' ',
|
||||||
"Items",
|
"Items",
|
||||||
"Delay","Hold",
|
"Delay","Hold",
|
||||||
"Total",
|
"Total",
|
||||||
|
@ -173,7 +182,8 @@ DEFUN(show_work_queues,
|
||||||
|
|
||||||
for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
|
for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
|
||||||
{
|
{
|
||||||
vty_out (vty,"%8d %5d %5d %8ld %7d %6d %6u %s%s",
|
vty_out (vty,"%c %8d %5d %5d %8ld %7d %6d %6u %s%s",
|
||||||
|
(wq->flags == WQ_PLUGGED ? 'P' : ' '),
|
||||||
listcount (wq->items),
|
listcount (wq->items),
|
||||||
wq->spec.delay, wq->spec.hold,
|
wq->spec.delay, wq->spec.hold,
|
||||||
wq->runs,
|
wq->runs,
|
||||||
|
@ -187,6 +197,32 @@ DEFUN(show_work_queues,
|
||||||
return CMD_SUCCESS;
|
return CMD_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* 'plug' a queue: Stop it from being scheduled,
|
||||||
|
* ie: prevent the queue from draining.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
work_queue_plug (struct work_queue *wq)
|
||||||
|
{
|
||||||
|
if (wq->thread)
|
||||||
|
thread_cancel (wq->thread);
|
||||||
|
|
||||||
|
wq->thread = NULL;
|
||||||
|
|
||||||
|
wq->flags = WQ_PLUGGED;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unplug queue, schedule it again, if appropriate
|
||||||
|
* Ie: Allow the queue to be drained again
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
work_queue_unplug (struct work_queue *wq)
|
||||||
|
{
|
||||||
|
wq->flags = WQ_UNPLUGGED;
|
||||||
|
|
||||||
|
/* if thread isnt already waiting, add one */
|
||||||
|
work_queue_schedule (wq, wq->spec.hold);
|
||||||
|
}
|
||||||
|
|
||||||
/* timer thread to process a work queue
|
/* timer thread to process a work queue
|
||||||
* will reschedule itself if required,
|
* will reschedule itself if required,
|
||||||
* otherwise work_queue_item_add
|
* otherwise work_queue_item_add
|
||||||
|
@ -250,6 +286,13 @@ work_queue_run (struct thread *thread)
|
||||||
|
|
||||||
switch (ret)
|
switch (ret)
|
||||||
{
|
{
|
||||||
|
case WQ_QUEUE_BLOCKED:
|
||||||
|
{
|
||||||
|
/* decrement item->ran again, cause this isn't an item
|
||||||
|
* specific error, and fall through to WQ_RETRY_LATER
|
||||||
|
*/
|
||||||
|
item->ran--;
|
||||||
|
}
|
||||||
case WQ_RETRY_LATER:
|
case WQ_RETRY_LATER:
|
||||||
{
|
{
|
||||||
goto stats;
|
goto stats;
|
||||||
|
@ -260,6 +303,7 @@ work_queue_run (struct thread *thread)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case WQ_RETRY_NOW:
|
case WQ_RETRY_NOW:
|
||||||
|
/* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
|
||||||
case WQ_ERROR:
|
case WQ_ERROR:
|
||||||
{
|
{
|
||||||
if (wq->spec.errorfunc)
|
if (wq->spec.errorfunc)
|
||||||
|
@ -303,7 +347,9 @@ stats:
|
||||||
wq->cycles.best = cycles;
|
wq->cycles.best = cycles;
|
||||||
|
|
||||||
/* along with yielded check, provides hysteris for granularity */
|
/* along with yielded check, provides hysteris for granularity */
|
||||||
if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
|
if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR * 2))
|
||||||
|
wq->cycles.granularity *= WQ_HYSTERIS_FACTOR; /* quick ramp-up */
|
||||||
|
else if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
|
||||||
wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
|
wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
|
||||||
}
|
}
|
||||||
#undef WQ_HYSTERIS_FACTOR
|
#undef WQ_HYSTERIS_FACTOR
|
||||||
|
@ -316,10 +362,11 @@ stats:
|
||||||
__func__, cycles, wq->cycles.best, wq->cycles.granularity);
|
__func__, cycles, wq->cycles.best, wq->cycles.granularity);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Is the queue done yet? */
|
/* Is the queue done yet? If it is, call the completion callback. */
|
||||||
if (listcount (wq->items) > 0)
|
if (listcount (wq->items) > 0)
|
||||||
wq->thread = thread_add_background (wq->master, work_queue_run, wq,
|
work_queue_schedule (wq, wq->spec.delay);
|
||||||
wq->spec.delay);
|
else if (wq->spec.completion_func)
|
||||||
|
wq->spec.completion_func (wq);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,10 @@ typedef enum
|
||||||
WQ_ERROR, /* Error, run error handler if provided */
|
WQ_ERROR, /* Error, run error handler if provided */
|
||||||
WQ_RETRY_NOW, /* retry immediately */
|
WQ_RETRY_NOW, /* retry immediately */
|
||||||
WQ_RETRY_LATER, /* retry later, cease processing work queue */
|
WQ_RETRY_LATER, /* retry later, cease processing work queue */
|
||||||
WQ_REQUEUE /* requeue item, continue processing work queue */
|
WQ_REQUEUE, /* requeue item, continue processing work queue */
|
||||||
|
WQ_QUEUE_BLOCKED, /* Queue cant be processed at this time.
|
||||||
|
* Similar to WQ_RETRY_LATER, but doesn't penalise
|
||||||
|
* the particular item.. */
|
||||||
} wq_item_status;
|
} wq_item_status;
|
||||||
|
|
||||||
/* A single work queue item, unsurprisingly */
|
/* A single work queue item, unsurprisingly */
|
||||||
|
@ -45,11 +48,18 @@ struct work_queue_item
|
||||||
unsigned short ran; /* # of times item has been run */
|
unsigned short ran; /* # of times item has been run */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum work_queue_flags
|
||||||
|
{
|
||||||
|
WQ_UNPLUGGED = 0,
|
||||||
|
WQ_PLUGGED = 1,
|
||||||
|
};
|
||||||
|
|
||||||
struct work_queue
|
struct work_queue
|
||||||
{
|
{
|
||||||
struct thread_master *master; /* thread master */
|
struct thread_master *master; /* thread master */
|
||||||
struct thread *thread; /* thread, if one is active */
|
struct thread *thread; /* thread, if one is active */
|
||||||
char *name; /* work queue name */
|
char *name; /* work queue name */
|
||||||
|
enum work_queue_flags flags; /* flags */
|
||||||
|
|
||||||
/* specification for this work queue */
|
/* specification for this work queue */
|
||||||
struct {
|
struct {
|
||||||
|
@ -62,6 +72,9 @@ struct work_queue
|
||||||
/* callback to delete user specific item data */
|
/* callback to delete user specific item data */
|
||||||
void (*del_item_data) (void *);
|
void (*del_item_data) (void *);
|
||||||
|
|
||||||
|
/* completion callback, called when queue is emptied, optional */
|
||||||
|
void (*completion_func) (struct work_queue *);
|
||||||
|
|
||||||
/* max number of retries to make for item that errors */
|
/* max number of retries to make for item that errors */
|
||||||
unsigned int max_retries;
|
unsigned int max_retries;
|
||||||
|
|
||||||
|
@ -71,7 +84,7 @@ struct work_queue
|
||||||
|
|
||||||
/* remaining fields should be opaque to users */
|
/* remaining fields should be opaque to users */
|
||||||
struct list *items; /* queue item list */
|
struct list *items; /* queue item list */
|
||||||
unsigned long runs; /* runs count */
|
unsigned long runs; /* runs count */
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
unsigned int best;
|
unsigned int best;
|
||||||
|
@ -81,11 +94,24 @@ struct work_queue
|
||||||
};
|
};
|
||||||
|
|
||||||
/* User API */
|
/* User API */
|
||||||
|
|
||||||
|
/* create a new work queue, of given name.
|
||||||
|
* user must fill in the spec of the returned work queue before adding
|
||||||
|
* anything to it
|
||||||
|
*/
|
||||||
extern struct work_queue *work_queue_new (struct thread_master *,
|
extern struct work_queue *work_queue_new (struct thread_master *,
|
||||||
const char *);
|
const char *);
|
||||||
|
/* destroy work queue */
|
||||||
extern void work_queue_free (struct work_queue *);
|
extern void work_queue_free (struct work_queue *);
|
||||||
|
|
||||||
|
/* Add the supplied data as an item onto the workqueue */
|
||||||
extern void work_queue_add (struct work_queue *, void *);
|
extern void work_queue_add (struct work_queue *, void *);
|
||||||
|
|
||||||
|
/* plug the queue, ie prevent it from being drained / processed */
|
||||||
|
extern void work_queue_plug (struct work_queue *wq);
|
||||||
|
/* unplug the queue, allow it to be drained again */
|
||||||
|
extern void work_queue_unplug (struct work_queue *wq);
|
||||||
|
|
||||||
/* Helpers, exported for thread.c and command.c */
|
/* Helpers, exported for thread.c and command.c */
|
||||||
extern int work_queue_run (struct thread *);
|
extern int work_queue_run (struct thread *);
|
||||||
extern struct cmd_element show_work_queues_cmd;
|
extern struct cmd_element show_work_queues_cmd;
|
||||||
|
|
Loading…
Reference in a new issue