Zebra: ADD Protobuf Encoding & Decoding for MLAG Messages

1. add the Mlag ProtoBuf Lib to Zebra Compilation
2. Encode the messages with protobuf before writing to MLAG
3. Decode the MLAG Messages using protobuf and write to clients
   based on their subscrption.

Signed-off-by: Satheesh Kumar K <sathk@cumulusnetworks.com>
This commit is contained in:
Satheesh Kumar K 2019-11-12 01:41:04 -08:00
parent e05ab0b0c8
commit 67fa73f29a
7 changed files with 556 additions and 43 deletions

View file

@ -125,11 +125,11 @@ include doc/manpages/subdir.am
include doc/developer/subdir.am
include include/subdir.am
include lib/subdir.am
include mlag/subdir.am
include zebra/subdir.am
include watchfrr/subdir.am
include qpb/subdir.am
include fpm/subdir.am
include mlag/subdir.am
include grpc/subdir.am
include tools/subdir.am
include solaris/subdir.am

View file

@ -141,7 +141,7 @@ int pim_zebra_mlag_handle_msg(struct stream *s, int len)
} break;
case MLAG_MROUTE_ADD_BULK: {
struct mlag_mroute_add msg;
int i = 0;
int i;
for (i = 0; i < mlag_msg.msg_cnt; i++) {
@ -153,7 +153,7 @@ int pim_zebra_mlag_handle_msg(struct stream *s, int len)
} break;
case MLAG_MROUTE_DEL_BULK: {
struct mlag_mroute_del msg;
int i = 0;
int i;
for (i = 0; i < mlag_msg.msg_cnt; i++) {

View file

@ -38,6 +38,9 @@ man8 += $(MANBUILD)/zebra.8
endif
zebra_zebra_LDADD = lib/libfrr.la $(LIBCAP)
if HAVE_PROTOBUF
zebra_zebra_LDADD += mlag/libmlag_pb.la $(PROTOBUF_C_LIBS)
endif
zebra_zebra_SOURCES = \
zebra/connected.c \
zebra/debug.c \
@ -131,6 +134,7 @@ noinst_HEADERS += \
zebra/rtadv.h \
zebra/rule_netlink.h \
zebra/zebra_mlag.h \
zebra/zebra_mlag_private.h \
zebra/zebra_fpm_private.h \
zebra/zebra_l2.h \
zebra/zebra_dplane.h \

View file

@ -108,7 +108,7 @@ void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len)
int msg_type = 0;
s = stream_new(ZEBRA_MAX_PACKET_SIZ);
msg_type = zebra_mlag_protobuf_decode_message(&s, data, len);
msg_type = zebra_mlag_protobuf_decode_message(s, data, len);
if (msg_type <= 0) {
/* Something went wrong in decoding */
@ -148,6 +148,7 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
struct stream *s;
uint32_t wr_count = 0;
uint32_t msg_type = 0;
uint32_t max_count = 0;
int len = 0;
wr_count = stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo);
@ -155,12 +156,9 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
zlog_debug(":%s: Processing MLAG write, %d messages in queue",
__func__, wr_count);
zrouter.mlag_info.t_write = NULL;
for (wr_count = 0; wr_count < ZEBRA_MLAG_POST_LIMIT; wr_count++) {
/* FIFO is empty,wait for teh message to be add */
if (stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo) == 0)
break;
max_count = MIN(wr_count, ZEBRA_MLAG_POST_LIMIT);
for (wr_count = 0; wr_count < max_count; wr_count++) {
s = stream_fifo_pop_safe(zrouter.mlag_info.mlag_fifo);
if (!s) {
zlog_debug(":%s: Got a NULL Messages, some thing wrong",
@ -168,7 +166,6 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
break;
}
zebra_mlag_reset_write_buffer();
/*
* Encode the data now
*/
@ -177,17 +174,19 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
/*
* write to MCLAGD
*/
if (len > 0)
if (len > 0) {
zebra_mlag_private_write_data(mlag_wr_buffer, len);
/*
* If message type is De-register, send a signal to main thread,
* so that necessary cleanup will be done by main thread.
* If message type is De-register, send a signal to main
* thread, so that necessary cleanup will be done by
* main thread.
*/
if (msg_type == MLAG_DEREGISTER) {
thread_add_event(zrouter.master,
zebra_mlag_terminate_pthread, NULL, 0,
NULL);
zebra_mlag_terminate_pthread,
NULL, 0, NULL);
}
}
stream_free(s);
@ -241,13 +240,16 @@ void zebra_mlag_handle_process_state(enum zebra_mlag_state state)
*/
static int zebra_mlag_signal_write_thread(void)
{
frr_with_mutex (&zrouter.mlag_info.mlag_th_mtx) {
if (zrouter.mlag_info.zebra_pth_mlag) {
if (IS_ZEBRA_DEBUG_MLAG)
zlog_debug(":%s: Scheduling MLAG write", __func__);
zlog_debug(":%s: Scheduling MLAG write",
__func__);
thread_add_event(zrouter.mlag_info.th_master,
zebra_mlag_client_msg_handler, NULL, 0,
&zrouter.mlag_info.t_write);
}
}
return 0;
}
@ -375,7 +377,7 @@ static void zebra_mlag_spawn_pthread(void)
zrouter.mlag_info.th_master = zrouter.mlag_info.zebra_pth_mlag->master;
/* Enqueue an initial event for the dataplane pthread */
/* Enqueue an initial event to the Newly spawn MLAG pthread */
zebra_mlag_signal_write_thread();
frr_pthread_run(zrouter.mlag_info.zebra_pth_mlag, NULL);
@ -583,7 +585,7 @@ DEFUN_HIDDEN (show_mlag,
ZEBRA_STR
"The mlag role on this machine\n")
{
char buf[80];
char buf[MLAG_ROLE_STRSIZE];
vty_out(vty, "MLag is configured to: %s\n",
mlag_role2str(zrouter.mlag_info.role, buf, sizeof(buf)));
@ -600,7 +602,7 @@ DEFPY(test_mlag, test_mlag_cmd,
"Mlag is setup to be the secondary\n")
{
enum mlag_role orig = zrouter.mlag_info.role;
char buf1[80], buf2[80];
char buf1[MLAG_ROLE_STRSIZE], buf2[MLAG_ROLE_STRSIZE];
if (none)
zrouter.mlag_info.role = MLAG_ROLE_NONE;
@ -619,8 +621,12 @@ DEFPY(test_mlag, test_mlag_cmd,
if (zrouter.mlag_info.role != MLAG_ROLE_NONE) {
if (zrouter.mlag_info.clients_interested_cnt == 0
&& test_mlag_in_progress == false) {
if (zrouter.mlag_info.zebra_pth_mlag == NULL)
frr_with_mutex (
&zrouter.mlag_info.mlag_th_mtx) {
if (zrouter.mlag_info.zebra_pth_mlag
== NULL)
zebra_mlag_spawn_pthread();
}
zrouter.mlag_info.clients_interested_cnt++;
test_mlag_in_progress = true;
zebra_mlag_private_open_channel();
@ -655,8 +661,8 @@ void zebra_mlag_init(void)
zrouter.mlag_info.t_read = NULL;
zrouter.mlag_info.t_write = NULL;
test_mlag_in_progress = false;
zebra_mlag_reset_write_buffer();
zebra_mlag_reset_read_buffer();
pthread_mutex_init(&zrouter.mlag_info.mlag_th_mtx, NULL);
}
void zebra_mlag_terminate(void)
@ -669,13 +675,514 @@ void zebra_mlag_terminate(void)
* ProtoBuf Encoding APIs
*/
#ifdef HAVE_PROTOBUF
DEFINE_MTYPE_STATIC(ZEBRA, MLAG_PBUF, "ZEBRA MLAG PROTOBUF")
int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type)
{
ZebraMlagHeader hdr = ZEBRA_MLAG__HEADER__INIT;
struct mlag_msg mlag_msg;
uint8_t tmp_buf[ZEBRA_MLAG_BUF_LIMIT];
int len = 0;
int n_len = 0;
int rc = 0;
char buf[ZLOG_FILTER_LENGTH_MAX];
if (IS_ZEBRA_DEBUG_MLAG)
zlog_debug("%s: Entering..", __func__);
rc = zebra_mlag_lib_decode_mlag_hdr(s, &mlag_msg);
if (rc)
return rc;
if (IS_ZEBRA_DEBUG_MLAG)
zlog_debug("%s: Decoded msg length:%d..", __func__,
mlag_msg.data_len);
if (IS_ZEBRA_DEBUG_MLAG)
zlog_debug("%s: Mlag ProtoBuf encoding of message:%s", __func__,
zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
sizeof(buf)));
*msg_type = mlag_msg.msg_type;
switch (mlag_msg.msg_type) {
case MLAG_MROUTE_ADD: {
struct mlag_mroute_add msg;
ZebraMlagMrouteAdd pay_load = ZEBRA_MLAG_MROUTE_ADD__INIT;
uint32_t vrf_name_len = 0;
rc = zebra_mlag_lib_decode_mroute_add(s, &msg);
if (rc)
return rc;
vrf_name_len = strlen(msg.vrf_name) + 1;
pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len);
pay_load.source_ip = msg.source_ip;
pay_load.group_ip = msg.group_ip;
pay_load.cost_to_rp = msg.cost_to_rp;
pay_load.owner_id = msg.owner_id;
pay_load.am_i_dr = msg.am_i_dr;
pay_load.am_i_dual_active = msg.am_i_dual_active;
pay_load.vrf_id = msg.vrf_id;
if (msg.owner_id == MLAG_OWNER_INTERFACE) {
vrf_name_len = strlen(msg.intf_name) + 1;
pay_load.intf_name =
XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load.intf_name, msg.intf_name,
vrf_name_len);
}
len = zebra_mlag_mroute_add__pack(&pay_load, tmp_buf);
XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name);
if (msg.owner_id == MLAG_OWNER_INTERFACE)
XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name);
} break;
case MLAG_MROUTE_DEL: {
struct mlag_mroute_del msg;
ZebraMlagMrouteDel pay_load = ZEBRA_MLAG_MROUTE_DEL__INIT;
uint32_t vrf_name_len = 0;
rc = zebra_mlag_lib_decode_mroute_del(s, &msg);
if (rc)
return rc;
vrf_name_len = strlen(msg.vrf_name) + 1;
pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len);
pay_load.source_ip = msg.source_ip;
pay_load.group_ip = msg.group_ip;
pay_load.owner_id = msg.owner_id;
pay_load.vrf_id = msg.vrf_id;
if (msg.owner_id == MLAG_OWNER_INTERFACE) {
vrf_name_len = strlen(msg.intf_name) + 1;
pay_load.intf_name =
XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load.intf_name, msg.intf_name,
vrf_name_len);
}
len = zebra_mlag_mroute_del__pack(&pay_load, tmp_buf);
XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name);
if (msg.owner_id == MLAG_OWNER_INTERFACE)
XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name);
} break;
case MLAG_MROUTE_ADD_BULK: {
struct mlag_mroute_add msg;
ZebraMlagMrouteAddBulk Bulk_msg =
ZEBRA_MLAG_MROUTE_ADD_BULK__INIT;
ZebraMlagMrouteAdd **pay_load = NULL;
int i;
bool cleanup = false;
Bulk_msg.n_mroute_add = mlag_msg.msg_cnt;
pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteAdd *)
* mlag_msg.msg_cnt);
for (i = 0; i < mlag_msg.msg_cnt; i++) {
uint32_t vrf_name_len = 0;
rc = zebra_mlag_lib_decode_mroute_add(s, &msg);
if (rc) {
cleanup = true;
break;
}
pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF,
sizeof(ZebraMlagMrouteAdd));
zebra_mlag_mroute_add__init(pay_load[i]);
vrf_name_len = strlen(msg.vrf_name) + 1;
pay_load[i]->vrf_name =
XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load[i]->vrf_name, msg.vrf_name,
vrf_name_len);
pay_load[i]->source_ip = msg.source_ip;
pay_load[i]->group_ip = msg.group_ip;
pay_load[i]->cost_to_rp = msg.cost_to_rp;
pay_load[i]->owner_id = msg.owner_id;
pay_load[i]->am_i_dr = msg.am_i_dr;
pay_load[i]->am_i_dual_active = msg.am_i_dual_active;
pay_load[i]->vrf_id = msg.vrf_id;
if (msg.owner_id == MLAG_OWNER_INTERFACE) {
vrf_name_len = strlen(msg.intf_name) + 1;
pay_load[i]->intf_name =
XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load[i]->intf_name, msg.intf_name,
vrf_name_len);
}
}
if (cleanup == false) {
Bulk_msg.mroute_add = pay_load;
len = zebra_mlag_mroute_add_bulk__pack(&Bulk_msg,
tmp_buf);
}
for (i = 0; i < mlag_msg.msg_cnt; i++) {
if (pay_load[i]->vrf_name)
XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name);
if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE
&& pay_load[i]->intf_name)
XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name);
if (pay_load[i])
XFREE(MTYPE_MLAG_PBUF, pay_load[i]);
}
XFREE(MTYPE_MLAG_PBUF, pay_load);
if (cleanup == true)
return -1;
} break;
case MLAG_MROUTE_DEL_BULK: {
struct mlag_mroute_del msg;
ZebraMlagMrouteDelBulk Bulk_msg =
ZEBRA_MLAG_MROUTE_DEL_BULK__INIT;
ZebraMlagMrouteDel **pay_load = NULL;
int i;
bool cleanup = false;
Bulk_msg.n_mroute_del = mlag_msg.msg_cnt;
pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteDel *)
* mlag_msg.msg_cnt);
for (i = 0; i < mlag_msg.msg_cnt; i++) {
uint32_t vrf_name_len = 0;
rc = zebra_mlag_lib_decode_mroute_del(s, &msg);
if (rc) {
cleanup = true;
break;
}
pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF,
sizeof(ZebraMlagMrouteDel));
zebra_mlag_mroute_del__init(pay_load[i]);
vrf_name_len = strlen(msg.vrf_name) + 1;
pay_load[i]->vrf_name =
XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load[i]->vrf_name, msg.vrf_name,
vrf_name_len);
pay_load[i]->source_ip = msg.source_ip;
pay_load[i]->group_ip = msg.group_ip;
pay_load[i]->owner_id = msg.owner_id;
pay_load[i]->vrf_id = msg.vrf_id;
if (msg.owner_id == MLAG_OWNER_INTERFACE) {
vrf_name_len = strlen(msg.intf_name) + 1;
pay_load[i]->intf_name =
XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
strlcpy(pay_load[i]->intf_name, msg.intf_name,
vrf_name_len);
}
}
if (!cleanup) {
Bulk_msg.mroute_del = pay_load;
len = zebra_mlag_mroute_del_bulk__pack(&Bulk_msg,
tmp_buf);
}
for (i = 0; i < mlag_msg.msg_cnt; i++) {
if (pay_load[i]->vrf_name)
XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name);
if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE
&& pay_load[i]->intf_name)
XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name);
if (pay_load[i])
XFREE(MTYPE_MLAG_PBUF, pay_load[i]);
}
XFREE(MTYPE_MLAG_PBUF, pay_load);
if (cleanup)
return -1;
} break;
default:
break;
}
if (IS_ZEBRA_DEBUG_MLAG)
zlog_debug("%s: length of Mlag ProtoBuf encoded message:%s, %d",
__func__,
zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
sizeof(buf)),
len);
hdr.type = (ZebraMlagHeader__MessageType)mlag_msg.msg_type;
if (len != 0) {
hdr.data.len = len;
hdr.data.data = XMALLOC(MTYPE_MLAG_PBUF, len);
memcpy(hdr.data.data, tmp_buf, len);
}
/*
* ProtoBuf Infra will not support to demarc the pointers whem multiple
* messages are posted inside a single Buffer.
* 2 -solutions exist to solve this
* 1. add Unenoced length at the beginning of every message, this will
* be used to point to next message in the buffer
* 2. another solution is defining all messages insides another message
* But this will permit only 32 messages. this can be extended with
* multiple levels.
* for simplicity we are going with solution-1.
*/
len = zebra_mlag__header__pack(&hdr,
(mlag_wr_buffer + ZEBRA_MLAG_LEN_SIZE));
n_len = htonl(len);
memcpy(mlag_wr_buffer, &n_len, ZEBRA_MLAG_LEN_SIZE);
len += ZEBRA_MLAG_LEN_SIZE;
if (IS_ZEBRA_DEBUG_MLAG)
zlog_debug(
"%s: length of Mlag ProtoBuf message:%s with Header %d",
__func__,
zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
sizeof(buf)),
len);
if (hdr.data.data)
XFREE(MTYPE_MLAG_PBUF, hdr.data.data);
return len;
}
int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
uint32_t len)
{
uint32_t msg_type;
ZebraMlagHeader *hdr;
char buf[80];
hdr = zebra_mlag__header__unpack(NULL, len, data);
if (hdr == NULL)
return -1;
/*
* ADD The MLAG Header
*/
zclient_create_header(s, ZEBRA_MLAG_FORWARD_MSG, VRF_DEFAULT);
msg_type = hdr->type;
if (IS_ZEBRA_DEBUG_MLAG)
zlog_debug("%s: Mlag ProtoBuf decoding of message:%s", __func__,
zebra_mlag_lib_msgid_to_str(msg_type, buf, 80));
/*
* Internal MLAG Message-types & MLAG.proto message types should
* always match, otherwise there can be decoding errors
* To avoid exposing clients with Protobuf flags, using internal
* message-types
*/
stream_putl(s, hdr->type);
if (hdr->data.len == 0) {
/* NULL Payload */
stream_putw(s, MLAG_MSG_NULL_PAYLOAD);
/* No Batching */
stream_putw(s, MLAG_MSG_NO_BATCH);
} else {
switch (msg_type) {
case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_STATUS_UPDATE: {
ZebraMlagStatusUpdate *msg = NULL;
msg = zebra_mlag_status_update__unpack(
NULL, hdr->data.len, hdr->data.data);
if (msg == NULL) {
zebra_mlag__header__free_unpacked(hdr, NULL);
return -1;
}
/* Payload len */
stream_putw(s, sizeof(struct mlag_status));
/* No Batching */
stream_putw(s, MLAG_MSG_NO_BATCH);
/* Actual Data */
stream_put(s, msg->peerlink, INTERFACE_NAMSIZ);
stream_putl(s, msg->my_role);
stream_putl(s, msg->peer_state);
zebra_mlag_status_update__free_unpacked(msg, NULL);
} break;
case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_VXLAN_UPDATE: {
ZebraMlagVxlanUpdate *msg = NULL;
msg = zebra_mlag_vxlan_update__unpack(
NULL, hdr->data.len, hdr->data.data);
if (msg == NULL) {
zebra_mlag__header__free_unpacked(hdr, NULL);
return -1;
}
/* Payload len */
stream_putw(s, sizeof(struct mlag_vxlan));
/* No Batching */
stream_putw(s, MLAG_MSG_NO_BATCH);
/* Actual Data */
stream_putl(s, msg->anycast_ip);
stream_putl(s, msg->local_ip);
zebra_mlag_vxlan_update__free_unpacked(msg, NULL);
} break;
case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD: {
ZebraMlagMrouteAdd *msg = NULL;
msg = zebra_mlag_mroute_add__unpack(NULL, hdr->data.len,
hdr->data.data);
if (msg == NULL) {
zebra_mlag__header__free_unpacked(hdr, NULL);
return -1;
}
/* Payload len */
stream_putw(s, sizeof(struct mlag_mroute_add));
/* No Batching */
stream_putw(s, MLAG_MSG_NO_BATCH);
/* Actual Data */
stream_put(s, msg->vrf_name, VRF_NAMSIZ);
stream_putl(s, msg->source_ip);
stream_putl(s, msg->group_ip);
stream_putl(s, msg->cost_to_rp);
stream_putl(s, msg->owner_id);
stream_putc(s, msg->am_i_dr);
stream_putc(s, msg->am_i_dual_active);
stream_putl(s, msg->vrf_id);
if (msg->owner_id == MLAG_OWNER_INTERFACE)
stream_put(s, msg->intf_name, INTERFACE_NAMSIZ);
else
stream_put(s, NULL, INTERFACE_NAMSIZ);
zebra_mlag_mroute_add__free_unpacked(msg, NULL);
} break;
case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL: {
ZebraMlagMrouteDel *msg = NULL;
msg = zebra_mlag_mroute_del__unpack(NULL, hdr->data.len,
hdr->data.data);
if (msg == NULL) {
zebra_mlag__header__free_unpacked(hdr, NULL);
return -1;
}
/* Payload len */
stream_putw(s, sizeof(struct mlag_mroute_del));
/* No Batching */
stream_putw(s, MLAG_MSG_NO_BATCH);
/* Actual Data */
stream_put(s, msg->vrf_name, VRF_NAMSIZ);
stream_putl(s, msg->source_ip);
stream_putl(s, msg->group_ip);
stream_putl(s, msg->group_ip);
stream_putl(s, msg->owner_id);
stream_putl(s, msg->vrf_id);
if (msg->owner_id == MLAG_OWNER_INTERFACE)
stream_put(s, msg->intf_name, INTERFACE_NAMSIZ);
else
stream_put(s, NULL, INTERFACE_NAMSIZ);
zebra_mlag_mroute_del__free_unpacked(msg, NULL);
} break;
case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD_BULK: {
ZebraMlagMrouteAddBulk *Bulk_msg = NULL;
ZebraMlagMrouteAdd *msg = NULL;
size_t i;
Bulk_msg = zebra_mlag_mroute_add_bulk__unpack(
NULL, hdr->data.len, hdr->data.data);
if (Bulk_msg == NULL) {
zebra_mlag__header__free_unpacked(hdr, NULL);
return -1;
}
/* Payload len */
stream_putw(s, (Bulk_msg->n_mroute_add
* sizeof(struct mlag_mroute_add)));
/* No. of msgs in Batch */
stream_putw(s, Bulk_msg->n_mroute_add);
/* Actual Data */
for (i = 0; i < Bulk_msg->n_mroute_add; i++) {
msg = Bulk_msg->mroute_add[i];
stream_put(s, msg->vrf_name, VRF_NAMSIZ);
stream_putl(s, msg->source_ip);
stream_putl(s, msg->group_ip);
stream_putl(s, msg->cost_to_rp);
stream_putl(s, msg->owner_id);
stream_putc(s, msg->am_i_dr);
stream_putc(s, msg->am_i_dual_active);
stream_putl(s, msg->vrf_id);
if (msg->owner_id == MLAG_OWNER_INTERFACE)
stream_put(s, msg->intf_name,
INTERFACE_NAMSIZ);
else
stream_put(s, NULL, INTERFACE_NAMSIZ);
}
zebra_mlag_mroute_add_bulk__free_unpacked(Bulk_msg,
NULL);
} break;
case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL_BULK: {
ZebraMlagMrouteDelBulk *Bulk_msg = NULL;
ZebraMlagMrouteDel *msg = NULL;
size_t i;
Bulk_msg = zebra_mlag_mroute_del_bulk__unpack(
NULL, hdr->data.len, hdr->data.data);
if (Bulk_msg == NULL) {
zebra_mlag__header__free_unpacked(hdr, NULL);
return -1;
}
/* Payload len */
stream_putw(s, (Bulk_msg->n_mroute_del
* sizeof(struct mlag_mroute_del)));
/* No. of msgs in Batch */
stream_putw(s, Bulk_msg->n_mroute_del);
/* Actual Data */
for (i = 0; i < Bulk_msg->n_mroute_del; i++) {
msg = Bulk_msg->mroute_del[i];
stream_put(s, msg->vrf_name, VRF_NAMSIZ);
stream_putl(s, msg->source_ip);
stream_putl(s, msg->group_ip);
stream_putl(s, msg->owner_id);
stream_putl(s, msg->vrf_id);
if (msg->owner_id == MLAG_OWNER_INTERFACE)
stream_put(s, msg->intf_name,
INTERFACE_NAMSIZ);
else
stream_put(s, NULL, INTERFACE_NAMSIZ);
}
zebra_mlag_mroute_del_bulk__free_unpacked(Bulk_msg,
NULL);
} break;
case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_ZEBRA_STATUS_UPDATE: {
ZebraMlagZebraStatusUpdate *msg = NULL;
msg = zebra_mlag_zebra_status_update__unpack(
NULL, hdr->data.len, hdr->data.data);
if (msg == NULL) {
zebra_mlag__header__free_unpacked(hdr, NULL);
return -1;
}
/* Payload len */
stream_putw(s, sizeof(struct mlag_frr_status));
/* No Batching */
stream_putw(s, MLAG_MSG_NO_BATCH);
/* Actual Data */
stream_putl(s, msg->peer_frrstate);
zebra_mlag_zebra_status_update__free_unpacked(msg,
NULL);
} break;
default:
break;
}
}
zebra_mlag__header__free_unpacked(hdr, NULL);
return msg_type;
}
#else
int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type)
{
return 0;
}
int zebra_mlag_protobuf_decode_message(struct stream **s, uint8_t *data,
int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
uint32_t len)
{
return 0;
}
#endif

View file

@ -26,6 +26,10 @@
#include "zclient.h"
#include "zebra/zserv.h"
#ifdef HAVE_PROTOBUF
#include "mlag/mlag.pb-c.h"
#endif
#define ZEBRA_MLAG_BUF_LIMIT 2048
#define ZEBRA_MLAG_LEN_SIZE 4
@ -33,14 +37,8 @@ extern uint8_t mlag_wr_buffer[ZEBRA_MLAG_BUF_LIMIT];
extern uint8_t mlag_rd_buffer[ZEBRA_MLAG_BUF_LIMIT];
extern uint32_t mlag_rd_buf_offset;
static inline void zebra_mlag_reset_write_buffer(void)
{
memset(mlag_wr_buffer, 0, ZEBRA_MLAG_BUF_LIMIT);
}
static inline void zebra_mlag_reset_read_buffer(void)
{
memset(mlag_rd_buffer, 0, ZEBRA_MLAG_BUF_LIMIT);
mlag_rd_buf_offset = 0;
}
@ -64,6 +62,6 @@ void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len);
*/
int zebra_mlag_protobuf_encode_client_data(struct stream *s,
uint32_t *msg_type);
int zebra_mlag_protobuf_decode_message(struct stream **s, uint8_t *data,
int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
uint32_t len);
#endif

View file

@ -26,6 +26,7 @@
#include "hook.h"
#include "module.h"
#include "thread.h"
#include "frr_pthread.h"
#include "libfrr.h"
#include "version.h"
#include "network.h"
@ -70,8 +71,10 @@ int zebra_mlag_private_write_data(uint8_t *data, uint32_t len)
static void zebra_mlag_sched_read(void)
{
thread_add_read(zmlag_master, zebra_mlag_read, NULL, mlag_socket,
&zrouter.mlag_info.t_read);
frr_with_mutex (&zrouter.mlag_info.mlag_th_mtx) {
thread_add_read(zmlag_master, zebra_mlag_read, NULL,
mlag_socket, &zrouter.mlag_info.t_read);
}
}
static int zebra_mlag_read(struct thread *thread)
@ -80,8 +83,6 @@ static int zebra_mlag_read(struct thread *thread)
uint32_t h_msglen;
uint32_t tot_len, curr_len = mlag_rd_buf_offset;
zrouter.mlag_info.t_read = NULL;
/*
* Received message in sock_stream looks like below
* | len-1 (4 Bytes) | payload-1 (len-1) |
@ -103,6 +104,7 @@ static int zebra_mlag_read(struct thread *thread)
zebra_mlag_handle_process_state(MLAG_DOWN);
return -1;
}
mlag_rd_buf_offset += data_len;
if (data_len != (ssize_t)ZEBRA_MLAG_LEN_SIZE - curr_len) {
/* Try again later */
zebra_mlag_sched_read();
@ -131,6 +133,7 @@ static int zebra_mlag_read(struct thread *thread)
zebra_mlag_handle_process_state(MLAG_DOWN);
return -1;
}
mlag_rd_buf_offset += data_len;
if (data_len != (ssize_t)tot_len - curr_len) {
/* Try again later */
zebra_mlag_sched_read();

View file

@ -100,6 +100,7 @@ struct zebra_mlag_info {
/* Threads for read/write. */
struct thread *t_read;
struct thread *t_write;
pthread_mutex_t mlag_th_mtx;
};
struct zebra_router {