lib: Address ZMQ lib TODOs

Add write callback.
Add error callback.
Add frrzmq_check_events() function to check for edge triggered things
that may have happened after a zmq_send() call or so.
Update ZMQ tests.

Signed-off-by: ßingen <bingen@voltanet.io>
This commit is contained in:
ßingen 2017-11-09 16:02:18 +01:00
parent ccd2b0e6ce
commit afd0f10d63
4 changed files with 456 additions and 127 deletions

View file

@ -47,46 +47,43 @@ void frrzmq_finish(void)
} }
} }
/* read callback integration */
struct frrzmq_cb {
struct thread *thread;
void *zmqsock;
void *arg;
int fd;
bool cancelled;
void (*cb_msg)(void *arg, void *zmqsock);
void (*cb_part)(void *arg, void *zmqsock,
zmq_msg_t *msg, unsigned partnum);
};
static int frrzmq_read_msg(struct thread *t) static int frrzmq_read_msg(struct thread *t)
{ {
struct frrzmq_cb *cb = THREAD_ARG(t); struct frrzmq_cb **cbp = THREAD_ARG(t);
struct frrzmq_cb *cb;
zmq_msg_t msg; zmq_msg_t msg;
unsigned partno; unsigned partno;
unsigned char read = 0;
int ret, more; int ret, more;
size_t moresz; size_t moresz;
if (!cbp)
return 1;
cb = (*cbp);
if (!cb || !cb->zmqsock)
return 1;
while (1) { while (1) {
zmq_pollitem_t polli = { zmq_pollitem_t polli = {.socket = cb->zmqsock,
.socket = cb->zmqsock, .events = ZMQ_POLLIN};
.events = ZMQ_POLLIN
};
ret = zmq_poll(&polli, 1, 0); ret = zmq_poll(&polli, 1, 0);
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
if (!(polli.revents & ZMQ_POLLIN)) if (!(polli.revents & ZMQ_POLLIN))
break; break;
if (cb->cb_msg) { if (cb->read.cb_msg) {
cb->cb_msg(cb->arg, cb->zmqsock); cb->read.cb_msg(cb->read.arg, cb->zmqsock);
read = 1;
if (cb->cancelled) { if (cb->read.cancelled) {
XFREE(MTYPE_ZEROMQ_CB, cb); frrzmq_check_events(cbp, &cb->write,
ZMQ_POLLOUT);
cb->read.thread = NULL;
if (cb->write.cancelled && !cb->write.thread)
XFREE(MTYPE_ZEROMQ_CB, cb);
return 0; return 0;
} }
continue; continue;
@ -104,11 +101,17 @@ static int frrzmq_read_msg(struct thread *t)
zmq_msg_close(&msg); zmq_msg_close(&msg);
goto out_err; goto out_err;
} }
read = 1;
cb->cb_part(cb->arg, cb->zmqsock, &msg, partno); cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
if (cb->cancelled) { partno);
if (cb->read.cancelled) {
zmq_msg_close(&msg); zmq_msg_close(&msg);
XFREE(MTYPE_ZEROMQ_CB, cb); frrzmq_check_events(cbp, &cb->write,
ZMQ_POLLOUT);
cb->read.thread = NULL;
if (cb->write.cancelled && !cb->write.thread)
XFREE(MTYPE_ZEROMQ_CB, cb);
return 0; return 0;
} }
@ -116,8 +119,8 @@ static int frrzmq_read_msg(struct thread *t)
* message; don't use zmq_msg_more here */ * message; don't use zmq_msg_more here */
moresz = sizeof(more); moresz = sizeof(more);
more = 0; more = 0;
ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
&more, &moresz); &moresz);
if (ret < 0) { if (ret < 0) {
zmq_msg_close(&msg); zmq_msg_close(&msg);
goto out_err; goto out_err;
@ -128,64 +131,221 @@ static int frrzmq_read_msg(struct thread *t)
zmq_msg_close(&msg); zmq_msg_close(&msg);
} }
funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg, if (read)
cb, cb->fd, &cb->thread, t->funcname, t->schedfrom, frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
t->schedfrom_line);
funcname_thread_add_read_write(
THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
&cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
return 0; return 0;
out_err: out_err:
zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno); zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno);
return 0; if (cb->read.cb_error)
cb->read.cb_error(cb->read.arg, cb->zmqsock);
return 1;
} }
struct frrzmq_cb *funcname_frrzmq_thread_add_read( int funcname_frrzmq_thread_add_read(struct thread_master *master,
struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock),
void (*msgfunc)(void *arg, void *zmqsock), void (*partfunc)(void *arg, void *zmqsock,
void (*partfunc)(void *arg, void *zmqsock, zmq_msg_t *msg,
zmq_msg_t *msg, unsigned partnum), unsigned partnum),
void *arg, void *zmqsock, debugargdef) void (*errfunc)(void *arg, void *zmqsock),
void *arg, void *zmqsock,
struct frrzmq_cb **cbp, debugargdef)
{ {
int fd, events; int fd, events;
size_t len; size_t len;
struct frrzmq_cb *cb; struct frrzmq_cb *cb;
if (!cbp)
return -1;
if (!(msgfunc || partfunc) || (msgfunc && partfunc)) if (!(msgfunc || partfunc) || (msgfunc && partfunc))
return NULL; return -1;
len = sizeof(fd); len = sizeof(fd);
if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
return NULL; return -1;
len = sizeof(events); len = sizeof(events);
if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
return NULL; return -1;
cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); if (*cbp)
if (!cb) cb = *cbp;
return NULL; else {
cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
cb->arg = arg; cb->write.cancelled = 1;
cb->zmqsock = zmqsock; if (!cb)
cb->cb_msg = msgfunc; return -1;
cb->cb_part = partfunc; *cbp = cb;
cb->fd = fd; }
if (events & ZMQ_POLLIN) cb->zmqsock = zmqsock;
funcname_thread_add_event(master, cb->fd = fd;
frrzmq_read_msg, cb, fd, &cb->thread, cb->read.arg = arg;
funcname, schedfrom, fromln); cb->read.cb_msg = msgfunc;
else cb->read.cb_part = partfunc;
funcname_thread_add_read_write(THREAD_READ, master, cb->read.cb_error = errfunc;
frrzmq_read_msg, cb, fd, &cb->thread, cb->read.cancelled = 0;
funcname, schedfrom, fromln);
return cb; if (events & ZMQ_POLLIN) {
} if (cb->read.thread) {
thread_cancel(cb->read.thread);
void frrzmq_thread_cancel(struct frrzmq_cb *cb) cb->read.thread = NULL;
{ }
if (!cb->thread) { funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
/* canceling from within callback */ &cb->read.thread, funcname, schedfrom,
cb->cancelled = 1; fromln);
return; } else
funcname_thread_add_read_write(
THREAD_READ, master, frrzmq_read_msg, cbp, fd,
&cb->read.thread, funcname, schedfrom, fromln);
return 0;
}
static int frrzmq_write_msg(struct thread *t)
{
struct frrzmq_cb **cbp = THREAD_ARG(t);
struct frrzmq_cb *cb;
unsigned char written = 0;
int ret;
if (!cbp)
return 1;
cb = (*cbp);
if (!cb || !cb->zmqsock)
return 1;
while (1) {
zmq_pollitem_t polli = {.socket = cb->zmqsock,
.events = ZMQ_POLLOUT};
ret = zmq_poll(&polli, 1, 0);
if (ret < 0)
goto out_err;
if (!(polli.revents & ZMQ_POLLOUT))
break;
if (cb->write.cb_msg) {
cb->write.cb_msg(cb->write.arg, cb->zmqsock);
written = 1;
if (cb->write.cancelled) {
frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
cb->write.thread = NULL;
if (cb->read.cancelled && !cb->read.thread)
XFREE(MTYPE_ZEROMQ_CB, cb);
return 0;
}
continue;
}
}
if (written)
frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
funcname_thread_add_read_write(THREAD_WRITE, t->master,
frrzmq_write_msg, cbp, cb->fd,
&cb->write.thread, t->funcname,
t->schedfrom, t->schedfrom_line);
return 0;
out_err:
zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno);
if (cb->write.cb_error)
cb->write.cb_error(cb->write.arg, cb->zmqsock);
return 1;
}
int funcname_frrzmq_thread_add_write(struct thread_master *master,
void (*msgfunc)(void *arg, void *zmqsock),
void (*errfunc)(void *arg, void *zmqsock),
void *arg, void *zmqsock,
struct frrzmq_cb **cbp, debugargdef)
{
int fd, events;
size_t len;
struct frrzmq_cb *cb;
if (!cbp)
return -1;
if (!msgfunc)
return -1;
len = sizeof(fd);
if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
return -1;
len = sizeof(events);
if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
return -1;
if (*cbp)
cb = *cbp;
else {
cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
cb->read.cancelled = 1;
if (!cb)
return -1;
*cbp = cb;
}
cb->zmqsock = zmqsock;
cb->fd = fd;
cb->write.arg = arg;
cb->write.cb_msg = msgfunc;
cb->write.cb_part = NULL;
cb->write.cb_error = errfunc;
cb->write.cancelled = 0;
if (events & ZMQ_POLLOUT) {
if (cb->write.thread) {
thread_cancel(cb->write.thread);
cb->write.thread = NULL;
}
funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
&cb->write.thread, funcname,
schedfrom, fromln);
} else
funcname_thread_add_read_write(
THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
&cb->write.thread, funcname, schedfrom, fromln);
return 0;
}
void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
{
if (!cb || !*cb)
return;
core->cancelled = 1;
if (core->thread) {
thread_cancel(core->thread);
core->thread = NULL;
}
if ((*cb)->read.cancelled && !(*cb)->read.thread
&& (*cb)->write.cancelled && (*cb)->write.thread)
XFREE(MTYPE_ZEROMQ_CB, *cb);
}
void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
int event)
{
struct frrzmq_cb *cb;
int events;
size_t len;
if (!cbp)
return;
cb = (*cbp);
if (!cb || !cb->zmqsock)
return;
if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
return;
if (events & event && core->thread && !core->cancelled) {
struct thread_master *tm = core->thread->master;
thread_cancel(core->thread);
core->thread = NULL;
thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
: frrzmq_write_msg),
cbp, cb->fd, &core->thread);
} }
thread_cancel(cb->thread);
XFREE(MTYPE_ZEROMQ_CB, cb);
} }

View file

@ -33,6 +33,26 @@
* foo_LDFLAGS = libfrrzmq.la libfrr.la $(ZEROMQ_LIBS) * foo_LDFLAGS = libfrrzmq.la libfrr.la $(ZEROMQ_LIBS)
*/ */
/* callback integration */
struct cb_core {
struct thread *thread;
void *arg;
bool cancelled;
void (*cb_msg)(void *arg, void *zmqsock);
void (*cb_part)(void *arg, void *zmqsock, zmq_msg_t *msg,
unsigned partnum);
void (*cb_error)(void *arg, void *zmqsock);
};
struct frrzmq_cb {
void *zmqsock;
int fd;
struct cb_core read;
struct cb_core write;
};
/* libzmq's context /* libzmq's context
* *
* this is mostly here as a convenience, it has IPv6 enabled but nothing * this is mostly here as a convenience, it has IPv6 enabled but nothing
@ -40,21 +60,27 @@
*/ */
extern void *frrzmq_context; extern void *frrzmq_context;
extern void frrzmq_init (void); extern void frrzmq_init(void);
extern void frrzmq_finish (void); extern void frrzmq_finish(void);
#define debugargdef const char *funcname, const char *schedfrom, int fromln #define debugargdef const char *funcname, const char *schedfrom, int fromln
/* core event registration, one of these 2 macros should be used */ /* core event registration, one of these 2 macros should be used */
#define frrzmq_thread_add_read_msg(m,f,a,z) funcname_frrzmq_thread_add_read( \ #define frrzmq_thread_add_read_msg(m, f, e, a, z, d) \
m,f,NULL,a,z,#f,__FILE__,__LINE__) funcname_frrzmq_thread_add_read(m, f, NULL, e, a, z, d, #f, __FILE__, \
#define frrzmq_thread_add_read_part(m,f,a,z) funcname_frrzmq_thread_add_read( \ __LINE__)
m,NULL,f,a,z,#f,__FILE__,__LINE__) #define frrzmq_thread_add_read_part(m, f, e, a, z, d) \
funcname_frrzmq_thread_add_read(m, NULL, f, e, a, z, d, #f, __FILE__, \
__LINE__)
#define frrzmq_thread_add_write_msg(m, f, e, a, z, d) \
funcname_frrzmq_thread_add_write(m, f, e, a, z, d, #f, __FILE__, \
__LINE__)
struct cb_core;
struct frrzmq_cb; struct frrzmq_cb;
/* Set up a POLLIN notification to be called from the libfrr main loop. /* Set up a POLLIN or POLLOUT notification to be called from the libfrr main
* This has the following properties: * loop. This has the following properties:
* *
* - since ZeroMQ works with edge triggered notifications, it will loop and * - since ZeroMQ works with edge triggered notifications, it will loop and
* dispatch as many events as ZeroMQ has pending at the time libfrr calls * dispatch as many events as ZeroMQ has pending at the time libfrr calls
@ -67,22 +93,35 @@ struct frrzmq_cb;
* - if partfunc is specified, the message is read and partfunc is called * - if partfunc is specified, the message is read and partfunc is called
* for each ZeroMQ multi-part subpart. Note that you can't send replies * for each ZeroMQ multi-part subpart. Note that you can't send replies
* before all parts have been read because that violates the ZeroMQ FSM. * before all parts have been read because that violates the ZeroMQ FSM.
* - write version doesn't allow for partial callback, you must handle the
* whole message (all parts) in msgfunc callback
* - you can safely cancel the callback from within itself * - you can safely cancel the callback from within itself
* - installing a callback will check for pending events (ZMQ_EVENTS) and * - installing a callback will check for pending events (ZMQ_EVENTS) and
* may schedule the event to run as soon as libfrr is back in its main * may schedule the event to run as soon as libfrr is back in its main
* loop. * loop.
*
* TODO #1: add ZMQ_POLLERR / error callback
* TODO #2: add frrzmq_check_events() function to check for edge triggered
* things that may have happened after a zmq_send() call or so
*/ */
extern struct frrzmq_cb *funcname_frrzmq_thread_add_read( extern int funcname_frrzmq_thread_add_read(
struct thread_master *master, struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock),
void (*msgfunc)(void *arg, void *zmqsock), void (*partfunc)(void *arg, void *zmqsock, zmq_msg_t *msg,
void (*partfunc)(void *arg, void *zmqsock, unsigned partnum),
zmq_msg_t *msg, unsigned partnum), void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock,
void *arg, void *zmqsock, debugargdef); struct frrzmq_cb **cb, debugargdef);
extern int funcname_frrzmq_thread_add_write(
struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock),
void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock,
struct frrzmq_cb **cb, debugargdef);
extern void frrzmq_thread_cancel(struct frrzmq_cb *cb); extern void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core);
/*
* http://api.zeromq.org/4-2:zmq-getsockopt#toc10
*
* As the descriptor is edge triggered, applications must update the state of
* ZMQ_EVENTS after each invocation of zmq_send or zmq_recv.To be more explicit:
* after calling zmq_send the socket may become readable (and vice versa)
* without triggering a read event on the file descriptor.
*/
extern void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
int event);
#endif /* _FRRZMQ_H */ #endif /* _FRRZMQ_H */

View file

@ -23,6 +23,7 @@
#include "frr_zmq.h" #include "frr_zmq.h"
DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer") DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message")
static struct thread_master *master; static struct thread_master *master;
@ -31,6 +32,25 @@ static void msg_buf_free(void *data, void *hint)
XFREE(MTYPE_TESTBUF, data); XFREE(MTYPE_TESTBUF, data);
} }
static int recv_delim(void *zmqsock)
{
/* receive delim */
zmq_msg_t zdelim;
int more;
zmq_msg_init(&zdelim);
zmq_msg_recv(&zdelim, zmqsock, 0);
more = zmq_msg_more(&zdelim);
zmq_msg_close(&zdelim);
return more;
}
static void send_delim(void *zmqsock)
{
/* Send delim */
zmq_msg_t zdelim;
zmq_msg_init(&zdelim);
zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
zmq_msg_close(&zdelim);
}
static void run_client(int syncfd) static void run_client(int syncfd)
{ {
int i, j; int i, j;
@ -38,13 +58,14 @@ static void run_client(int syncfd)
char dummy; char dummy;
void *zmqctx = NULL; void *zmqctx = NULL;
void *zmqsock; void *zmqsock;
int more;
read(syncfd, &dummy, 1); read(syncfd, &dummy, 1);
zmqctx = zmq_ctx_new(); zmqctx = zmq_ctx_new();
zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
zmqsock = zmq_socket(zmqctx, ZMQ_REQ); zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
perror("zmq_connect"); perror("zmq_connect");
exit(1); exit(1);
@ -52,22 +73,28 @@ static void run_client(int syncfd)
/* single-part */ /* single-part */
for (i = 0; i < 8; i++) { for (i = 0; i < 8; i++) {
snprintf(buf, sizeof(buf), "msg #%d %c%c%c", snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
i, 'a' + i, 'b' + i, 'c' + i); 'b' + i, 'c' + i);
printf("client send: %s\n", buf); printf("client send: %s\n", buf);
fflush(stdout); fflush(stdout);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0); send_delim(zmqsock);
zmq_recv(zmqsock, buf, sizeof(buf), 0); zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
printf("client recv: %s\n", buf); more = recv_delim(zmqsock);
while (more) {
zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("client recv: %s\n", buf);
size_t len = sizeof(more);
if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
break;
}
} }
/* multipart */ /* multipart */
for (i = 2; i < 5; i++) { for (i = 2; i < 5; i++) {
int more;
printf("---\n"); printf("---\n");
send_delim(zmqsock);
zmq_msg_t part;
for (j = 1; j <= i; j++) { for (j = 1; j <= i; j++) {
zmq_msg_t part;
char *dyn = XMALLOC(MTYPE_TESTBUF, 32); char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(dyn, 32, "part %d/%d", j, i); snprintf(dyn, 32, "part %d/%d", j, i);
@ -79,7 +106,7 @@ static void run_client(int syncfd)
zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
} }
zmq_msg_t part; recv_delim(zmqsock);
do { do {
char *data; char *data;
@ -90,26 +117,85 @@ static void run_client(int syncfd)
} while (more); } while (more);
zmq_msg_close(&part); zmq_msg_close(&part);
} }
/* write callback */
printf("---\n");
snprintf(buf, 32, "Done receiving");
printf("client send: %s\n", buf);
fflush(stdout);
send_delim(zmqsock);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
/* wait for message from server */
more = recv_delim(zmqsock);
while (more) {
zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("client recv: %s\n", buf);
size_t len = sizeof(more);
if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
break;
}
zmq_close(zmqsock); zmq_close(zmqsock);
zmq_ctx_term(zmqctx); zmq_ctx_term(zmqctx);
} }
static struct frrzmq_cb *cb; static struct frrzmq_cb *cb;
static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
unsigned partnum)
{ {
/* receive id */
zmq_msg_init(msg_id);
zmq_msg_recv(msg_id, zmqsock, 0);
/* receive delim */
recv_delim(zmqsock);
}
static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
{
/* Send Id */
zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
send_delim(zmqsock);
}
static void serverwritefn(void *arg, void *zmqsock)
{
zmq_msg_t *msg_id = (zmq_msg_t *)arg;
char buf[32] = "Test write callback";
size_t i;
for (i = 0; i < strlen(buf); i++)
buf[i] = toupper(buf[i]);
printf("server send: %s\n", buf);
fflush(stdout);
send_id_and_delim(zmqsock, msg_id);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
/* send just once */
frrzmq_thread_cancel(&cb, &cb->write);
zmq_msg_close(msg_id);
XFREE(MTYPE_ZMQMSG, msg_id);
}
static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
unsigned partnum)
{
static int num = 0;
int more = zmq_msg_more(msg); int more = zmq_msg_more(msg);
char *in = zmq_msg_data(msg); char *in = zmq_msg_data(msg);
size_t i; size_t i;
zmq_msg_t reply; zmq_msg_t reply;
char *out; char *out;
/* Id */
if (partnum == 0) {
send_id_and_delim(zmqsock, msg);
return;
}
/* Delim */
if (partnum == 1)
return;
printf("server recv part %u (more: %d): %s\n", partnum, more, in); printf("server recv part %u (more: %d): %s\n", partnum, more, in);
fflush(stdout); fflush(stdout);
/* REQ-REP doesn't allow sending a reply here */
if (more)
return;
out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
for (i = 0; i < strlen(in); i++) for (i = 0; i < strlen(in); i++)
@ -118,39 +204,66 @@ static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
if (more)
return;
out = XMALLOC(MTYPE_TESTBUF, 32); out = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(out, 32, "msg# was %u", partnum); snprintf(out, 32, "msg# was %u", partnum);
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, 0); zmq_msg_send(&reply, zmqsock, 0);
zmq_msg_close(&reply);
if (++num < 7)
return;
/* write callback test */
char buf[32];
zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
recv_id_and_delim(zmqsock, msg_id);
zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("server recv: %s\n", buf);
fflush(stdout);
frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
zmqsock, &cb);
} }
static void serverfn(void *arg, void *zmqsock) static void serverfn(void *arg, void *zmqsock)
{ {
static int num = 0; static int num = 0;
zmq_msg_t msg_id;
char buf[32]; char buf[32];
size_t i; size_t i;
recv_id_and_delim(zmqsock, &msg_id);
zmq_recv(zmqsock, buf, sizeof(buf), 0); zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("server recv: %s\n", buf); printf("server recv: %s\n", buf);
fflush(stdout); fflush(stdout);
for (i = 0; i < strlen(buf); i++) for (i = 0; i < strlen(buf); i++)
buf[i] = toupper(buf[i]); buf[i] = toupper(buf[i]);
send_id_and_delim(zmqsock, &msg_id);
zmq_msg_close(&msg_id);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0); zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
if (++num < 4) if (++num < 4)
return; return;
/* change to multipart callback */ /* change to multipart callback */
frrzmq_thread_cancel(cb); frrzmq_thread_cancel(&cb, &cb->read);
frrzmq_thread_cancel(&cb, &cb->write);
cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock); frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
&cb);
} }
static void sigchld(void) static void sigchld(void)
{ {
printf("child exited.\n"); printf("child exited.\n");
frrzmq_thread_cancel(cb); frrzmq_thread_cancel(&cb, &cb->read);
frrzmq_thread_cancel(&cb, &cb->write);
} }
static struct quagga_signal_t sigs[] = { static struct quagga_signal_t sigs[] = {
@ -170,13 +283,13 @@ static void run_server(int syncfd)
signal_init(master, array_size(sigs), sigs); signal_init(master, array_size(sigs), sigs);
frrzmq_init(); frrzmq_init();
zmqsock = zmq_socket(frrzmq_context, ZMQ_REP); zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
if (zmq_bind(zmqsock, "tcp://*:17171")) { if (zmq_bind(zmqsock, "tcp://*:17171")) {
perror("zmq_bind"); perror("zmq_bind");
exit(1); exit(1);
} }
cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock); frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
write(syncfd, &dummy, sizeof(dummy)); write(syncfd, &dummy, sizeof(dummy));
while (thread_fetch(master, &t)) while (thread_fetch(master, &t))

View file

@ -11,40 +11,57 @@ client send: msg #3 def
server recv: msg #3 def server recv: msg #3 def
client recv: MSG #3 DEF client recv: MSG #3 DEF
client send: msg #4 efg client send: msg #4 efg
server recv part 0 (more: 0): msg #4 efg server recv part 2 (more: 0): msg #4 efg
client recv: MSG #4 EFG client recv: MSG #4 EFG
client recv: msg# was 2
client send: msg #5 fgh client send: msg #5 fgh
client recv: msg# was 0 server recv part 2 (more: 0): msg #5 fgh
client recv: MSG #5 FGH
client recv: msg# was 2
client send: msg #6 ghi client send: msg #6 ghi
server recv part 0 (more: 0): msg #6 ghi server recv part 2 (more: 0): msg #6 ghi
client recv: MSG #6 GHI client recv: MSG #6 GHI
client recv: msg# was 2
client send: msg #7 hij client send: msg #7 hij
client recv: msg# was 0 server recv part 2 (more: 0): msg #7 hij
client recv: MSG #7 HIJ
client recv: msg# was 2
--- ---
client send: part 1/2 client send: part 1/2
client send: part 2/2 client send: part 2/2
server recv part 0 (more: 1): part 1/2 server recv part 2 (more: 1): part 1/2
server recv part 1 (more: 0): part 2/2 server recv part 3 (more: 0): part 2/2
client recv (more: 1): PART 1/2
client recv (more: 1): PART 2/2 client recv (more: 1): PART 2/2
client recv (more: 0): msg# was 1 client recv (more: 0): msg# was 3
--- ---
client send: part 1/3 client send: part 1/3
client send: part 2/3 client send: part 2/3
client send: part 3/3 client send: part 3/3
server recv part 0 (more: 1): part 1/3 server recv part 2 (more: 1): part 1/3
server recv part 1 (more: 1): part 2/3 server recv part 3 (more: 1): part 2/3
server recv part 2 (more: 0): part 3/3 server recv part 4 (more: 0): part 3/3
client recv (more: 1): PART 1/3
client recv (more: 1): PART 2/3
client recv (more: 1): PART 3/3 client recv (more: 1): PART 3/3
client recv (more: 0): msg# was 2 client recv (more: 0): msg# was 4
--- ---
client send: part 1/4 client send: part 1/4
client send: part 2/4 client send: part 2/4
client send: part 3/4 client send: part 3/4
client send: part 4/4 client send: part 4/4
server recv part 0 (more: 1): part 1/4 server recv part 2 (more: 1): part 1/4
server recv part 1 (more: 1): part 2/4 server recv part 3 (more: 1): part 2/4
server recv part 2 (more: 1): part 3/4 server recv part 4 (more: 1): part 3/4
server recv part 3 (more: 0): part 4/4 server recv part 5 (more: 0): part 4/4
client recv (more: 1): PART 1/4
client recv (more: 1): PART 2/4
client recv (more: 1): PART 3/4
client recv (more: 1): PART 4/4 client recv (more: 1): PART 4/4
client recv (more: 0): msg# was 3 client recv (more: 0): msg# was 5
---
client send: Done receiving
server recv: Done receiving
server send: TEST WRITE CALLBACK
client recv: TEST WRITE CALLBACK
child exited. child exited.