Replace lists with arrays to store read and write threads

Ticket : CM-6300
Reviewed by : CCR-3049
Testing : Manual

With arrays, a thread corresponding to given fd is looked up in constant time
versus the linear time taken for list traversals.
This commit is contained in:
Denil Vira 2015-08-11 13:14:40 -07:00
parent 610f23cfff
commit 308d14aed9
2 changed files with 84 additions and 25 deletions

View file

@ -22,6 +22,7 @@
/* #define DEBUG */ /* #define DEBUG */
#include <zebra.h> #include <zebra.h>
#include <sys/resource.h>
#include "thread.h" #include "thread.h"
#include "memory.h" #include "memory.h"
@ -523,6 +524,9 @@ struct thread_master *
thread_master_create () thread_master_create ()
{ {
struct thread_master *rv; struct thread_master *rv;
struct rlimit limit;
getrlimit(RLIMIT_NOFILE, &limit);
if (cpu_record == NULL) if (cpu_record == NULL)
cpu_record cpu_record
@ -530,6 +534,26 @@ thread_master_create ()
(int (*) (const void *, const void *))cpu_record_hash_cmp); (int (*) (const void *, const void *))cpu_record_hash_cmp);
rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master)); rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
if (rv == NULL)
{
return NULL;
}
rv->fd_limit = (int)limit.rlim_cur;
rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
if (rv->read == NULL)
{
XFREE (MTYPE_THREAD_MASTER, rv);
return NULL;
}
rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
if (rv->write == NULL)
{
XFREE (MTYPE_THREAD, rv->read);
XFREE (MTYPE_THREAD_MASTER, rv);
return NULL;
}
/* Initialize the timer queues */ /* Initialize the timer queues */
rv->timer = pqueue_create(); rv->timer = pqueue_create();
@ -571,6 +595,18 @@ thread_list_delete (struct thread_list *list, struct thread *thread)
return thread; return thread;
} }
static void
thread_delete_fd (struct thread **thread_array, struct thread *thread)
{
thread_array[thread->u.fd] = NULL;
}
static void
thread_add_fd (struct thread **thread_array, struct thread *thread)
{
thread_array[thread->u.fd] = thread;
}
/* Move thread to unuse list. */ /* Move thread to unuse list. */
static void static void
thread_add_unuse (struct thread_master *m, struct thread *thread) thread_add_unuse (struct thread_master *m, struct thread *thread)
@ -599,6 +635,25 @@ thread_list_free (struct thread_master *m, struct thread_list *list)
} }
} }
static void
thread_array_free (struct thread_master *m, struct thread **thread_array)
{
struct thread *t;
int index;
for (index = 0; index < m->fd_limit; ++index)
{
t = thread_array[index];
if (t)
{
thread_array[index] = NULL;
XFREE (MTYPE_THREAD, t);
m->alloc--;
}
}
XFREE (MTYPE_THREAD, thread_array);
}
static void static void
thread_queue_free (struct thread_master *m, struct pqueue *queue) thread_queue_free (struct thread_master *m, struct pqueue *queue)
{ {
@ -615,8 +670,8 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue)
void void
thread_master_free (struct thread_master *m) thread_master_free (struct thread_master *m)
{ {
thread_list_free (m, &m->read); thread_array_free (m, m->read);
thread_list_free (m, &m->write); thread_array_free (m, m->write);
thread_queue_free (m, m->timer); thread_queue_free (m, m->timer);
thread_list_free (m, &m->event); thread_list_free (m, &m->event);
thread_list_free (m, &m->ready); thread_list_free (m, &m->ready);
@ -729,7 +784,7 @@ funcname_thread_add_read (struct thread_master *m,
thread = thread_get (m, THREAD_READ, func, arg, funcname); thread = thread_get (m, THREAD_READ, func, arg, funcname);
FD_SET (fd, &m->readfd); FD_SET (fd, &m->readfd);
thread->u.fd = fd; thread->u.fd = fd;
thread_list_add (&m->read, thread); thread_add_fd (m->read, thread);
return thread; return thread;
} }
@ -752,7 +807,7 @@ funcname_thread_add_write (struct thread_master *m,
thread = thread_get (m, THREAD_WRITE, func, arg, funcname); thread = thread_get (m, THREAD_WRITE, func, arg, funcname);
FD_SET (fd, &m->writefd); FD_SET (fd, &m->writefd);
thread->u.fd = fd; thread->u.fd = fd;
thread_list_add (&m->write, thread); thread_add_fd (m->write, thread);
return thread; return thread;
} }
@ -870,18 +925,19 @@ thread_cancel (struct thread *thread)
{ {
struct thread_list *list = NULL; struct thread_list *list = NULL;
struct pqueue *queue = NULL; struct pqueue *queue = NULL;
struct thread **thread_array = NULL;
switch (thread->type) switch (thread->type)
{ {
case THREAD_READ: case THREAD_READ:
assert (FD_ISSET (thread->u.fd, &thread->master->readfd)); assert (FD_ISSET (thread->u.fd, &thread->master->readfd));
FD_CLR (thread->u.fd, &thread->master->readfd); FD_CLR (thread->u.fd, &thread->master->readfd);
list = &thread->master->read; thread_array = thread->master->read;
break; break;
case THREAD_WRITE: case THREAD_WRITE:
assert (FD_ISSET (thread->u.fd, &thread->master->writefd)); assert (FD_ISSET (thread->u.fd, &thread->master->writefd));
FD_CLR (thread->u.fd, &thread->master->writefd); FD_CLR (thread->u.fd, &thread->master->writefd);
list = &thread->master->write; thread_array = thread->master->write;
break; break;
case THREAD_TIMER: case THREAD_TIMER:
queue = thread->master->timer; queue = thread->master->timer;
@ -910,9 +966,13 @@ thread_cancel (struct thread *thread)
{ {
thread_list_delete (list, thread); thread_list_delete (list, thread);
} }
else if (thread_array)
{
thread_delete_fd (thread_array, thread);
}
else else
{ {
assert(!"Thread should be either in queue or list!"); assert(!"Thread should be either in queue or list or array!");
} }
thread->type = THREAD_UNUSED; thread->type = THREAD_UNUSED;
@ -986,29 +1046,27 @@ thread_run (struct thread_master *m, struct thread *thread,
} }
static int static int
thread_process_fd (struct thread_list *list, fd_set *fdset, fd_set *mfdset) thread_process_fd (struct thread **thread_array, fd_set *fdset, fd_set *mfdset, int num, int fd_limit)
{ {
struct thread *thread; struct thread *thread;
struct thread *next; int ready = 0, index;
int ready = 0;
assert (list);
for (thread = list->head; thread; thread = next)
{
next = thread->next;
if (FD_ISSET (THREAD_FD (thread), fdset)) assert (thread_array);
for (index = 0; index < fd_limit && ready < num; ++index)
{
thread = thread_array[index];
if (thread && FD_ISSET (THREAD_FD (thread), fdset))
{ {
assert (FD_ISSET (THREAD_FD (thread), mfdset)); assert (FD_ISSET (THREAD_FD (thread), mfdset));
FD_CLR(THREAD_FD (thread), mfdset); FD_CLR(THREAD_FD (thread), mfdset);
thread_list_delete (list, thread); thread_delete_fd (thread_array, thread);
thread_list_add (&thread->master->ready, thread); thread_list_add (&thread->master->ready, thread);
thread->type = THREAD_READY; thread->type = THREAD_READY;
ready++; ready++;
} }
} }
return ready; return num - ready;
} }
/* Add all timers that have popped to the ready list. */ /* Add all timers that have popped to the ready list. */
@ -1161,10 +1219,10 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
/* Got IO, process it */ /* Got IO, process it */
if (num > 0) if (num > 0)
{ {
/* Normal priority read thead. */ /* Normal priority read thread. */
thread_process_fd (&m->read, &readfd, &m->readfd); num = thread_process_fd (m->read, &readfd, &m->readfd, num, m->fd_limit);
/* Write thead. */ /* Write thread. */
thread_process_fd (&m->write, &writefd, &m->writefd); num = thread_process_fd (m->write, &writefd, &m->writefd, num, m->fd_limit);
} }
#if 0 #if 0

View file

@ -49,13 +49,14 @@ struct pqueue;
/* Master of the theads. */ /* Master of the theads. */
struct thread_master struct thread_master
{ {
struct thread_list read; struct thread **read;
struct thread_list write; struct thread **write;
struct pqueue *timer; struct pqueue *timer;
struct thread_list event; struct thread_list event;
struct thread_list ready; struct thread_list ready;
struct thread_list unuse; struct thread_list unuse;
struct pqueue *background; struct pqueue *background;
int fd_limit;
fd_set readfd; fd_set readfd;
fd_set writefd; fd_set writefd;
fd_set exceptfd; fd_set exceptfd;