From c0c46bad15a5f3f69032678177ac7d00b7cd31be Mon Sep 17 00:00:00 2001 From: Soumya Roy Date: Fri, 14 Mar 2025 21:44:39 +0000 Subject: [PATCH 1/4] lib: Add support for stream buffer to expand Issue: Currently, during encode time, if required memory is more than available space in stream buffer, stream buffer can't be expanded. This fix introduces new apis to support stream buffer expansion. Testing: Tested with zebra nexthop encoding with 512 nexthops, which triggers this new code changes, it works fine. Without fix, for same trigger it asserts. Signed-off-by: Soumya Roy --- lib/stream.c | 142 ++++++++++++++++++++++++++++++++++++++++++--------- lib/stream.h | 4 +- 2 files changed, 120 insertions(+), 26 deletions(-) diff --git a/lib/stream.c b/lib/stream.c index bb90f3b944..6ac51859d3 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -16,6 +16,11 @@ #include "frr_pthread.h" #include "lib_errors.h" +#define MIN_STREAM_EXPANSION_SZ 512 + +/* Extra size needed for a stream in bytes, given new write size */ +#define STREAM_EXPAND_SIZE(S, WSZ) ((WSZ)-STREAM_WRITEABLE(S)) + DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream"); DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO"); @@ -92,11 +97,20 @@ struct stream *stream_new(size_t size) assert(size > 0); - s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size); + s = XMALLOC(MTYPE_STREAM, sizeof(struct stream)); + s->data = XMALLOC(MTYPE_STREAM, size); s->getp = s->endp = 0; s->next = NULL; s->size = size; + s->allow_expansion = false; + return s; +} + +struct stream *stream_new_expandable(size_t size) +{ + struct stream *s = stream_new(size); + s->allow_expansion = true; return s; } @@ -106,6 +120,7 @@ void stream_free(struct stream *s) if (!s) return; + XFREE(MTYPE_STREAM, s->data); XFREE(MTYPE_STREAM, s); } @@ -115,7 +130,7 @@ struct stream *stream_copy(struct stream *dest, const struct stream *src) assert(dest != NULL); assert(STREAM_SIZE(dest) >= src->endp); - + dest->allow_expansion = src->allow_expansion; dest->endp = src->endp; dest->getp = src->getp; @@ -131,7 +146,7 @@ struct stream *stream_dup(const struct stream *s) STREAM_VERIFY_SANE(s); snew = stream_new(s->endp); - + snew->allow_expansion = s->allow_expansion; return (stream_copy(snew, s)); } @@ -143,9 +158,16 @@ struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2, STREAM_VERIFY_SANE(s1); STREAM_VERIFY_SANE(s2); + if (offset > s1->endp) { + fprintf(stderr, "Error: Invalid offset %zu, exceeds s1->endp %zu\n", offset, + s1->endp); + return NULL; + } + if ((new = stream_new(s1->endp + s2->endp)) == NULL) return NULL; + new->allow_expansion = s1->allow_expansion || s2->allow_expansion; memcpy(new->data, s1->data, offset); memcpy(new->data + offset, s2->data, s2->endp); memcpy(new->data + offset + s2->endp, s1->data + offset, @@ -160,7 +182,7 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize) STREAM_VERIFY_SANE(orig); - orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize); + orig->data = XREALLOC(MTYPE_STREAM, orig->data, newsize); orig->size = newsize; @@ -175,6 +197,29 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize) return orig->size; } +/* Helper function to expand stream if needed and allowed */ +static void stream_expand(struct stream *s, size_t expand_size) +{ + size_t new_size; + size_t actual_expand_size = expand_size; + + /* Growth strategy: + * For small expansions (<= min expansion bytes): grow by min size + * otherwise grow by needed size + */ + if (actual_expand_size <= MIN_STREAM_EXPANSION_SZ) { + actual_expand_size = MIN_STREAM_EXPANSION_SZ; + } + + /* Calculate new total size */ + new_size = s->size + actual_expand_size; + /* Reallocate the data buffer */ + s->data = XREALLOC(MTYPE_STREAM, s->data, new_size); + + /* Update the stream's data size */ + s->size = new_size; +} + size_t stream_get_getp(const struct stream *s) { STREAM_VERIFY_SANE(s); @@ -691,8 +736,12 @@ void stream_put(struct stream *s, const void *src, size_t size) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < size) { - STREAM_BOUND_WARN(s, "put"); - return; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, size)); + } else { + STREAM_BOUND_WARN(s, "put"); + return; + } } if (src) @@ -709,8 +758,12 @@ int stream_putc(struct stream *s, uint8_t c) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint8_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = c; @@ -723,8 +776,12 @@ int stream_putw(struct stream *s, uint16_t w) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint16_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(w >> 8); @@ -739,8 +796,12 @@ int stream_put3(struct stream *s, uint32_t l) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < 3) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, 3)); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(l >> 16); @@ -756,8 +817,12 @@ int stream_putl(struct stream *s, uint32_t l) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(l >> 24); @@ -774,8 +839,12 @@ int stream_putq(struct stream *s, uint64_t q) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) { - STREAM_BOUND_WARN(s, "put quad"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint64_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(q >> 56); @@ -896,7 +965,13 @@ int stream_put_ipv4(struct stream *s, uint32_t l) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { - STREAM_BOUND_WARN(s, "put"); + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + return 0; } memcpy(s->data + s->endp, &l, sizeof(uint32_t)); @@ -911,8 +986,12 @@ int stream_put_in_addr(struct stream *s, const struct in_addr *addr) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } memcpy(s->data + s->endp, addr, sizeof(uint32_t)); @@ -989,8 +1068,13 @@ int stream_put_prefix_addpath(struct stream *s, const struct prefix *p, psize_with_addpath = psize; if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, + STREAM_EXPAND_SIZE(s, psize_with_addpath + sizeof(uint8_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } if (addpath_capable) { @@ -1027,8 +1111,12 @@ int stream_put_labeled_prefix(struct stream *s, const struct prefix *p, psize_with_addpath = psize + (addpath_capable ? 4 : 0); if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, psize_with_addpath + 3)); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } if (addpath_capable) { @@ -1167,8 +1255,12 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < size) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, size)); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } memcpy(s->data + s->endp, ptr, size); diff --git a/lib/stream.h b/lib/stream.h index e48cedc613..437ba5aa74 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -95,7 +95,8 @@ struct stream { size_t getp; /* next get position */ size_t endp; /* last valid data position */ size_t size; /* size of data segment */ - unsigned char data[]; /* data pointer */ + bool allow_expansion; /* whether stream can be expanded */ + unsigned char *data; /* data pointer */ }; /* First in first out queue structure. */ @@ -132,6 +133,7 @@ struct stream_fifo { * q: quad (four words) */ extern struct stream *stream_new(size_t); +extern struct stream *stream_new_expandable(size_t); extern void stream_free(struct stream *); /* Copy 'src' into 'dest', returns 'dest' */ extern struct stream *stream_copy(struct stream *dest, From 4de0f16a89da069e8fa9920ac888ff8aed4cc6e8 Mon Sep 17 00:00:00 2001 From: Soumya Roy Date: Fri, 14 Mar 2025 21:48:20 +0000 Subject: [PATCH 2/4] tests: Add staticd/ospfd/ospf6d/pimd for high ecmp Signed-off-by: Soumya Roy --- .../topotests/high_ecmp/test_high_ecmp_unnumbered.py | 11 ++++++++--- tests/topotests/high_ecmp/test_high_ecmp_v4.py | 11 ++++++++--- tests/topotests/high_ecmp/test_high_ecmp_v6.py | 11 ++++++++--- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/tests/topotests/high_ecmp/test_high_ecmp_unnumbered.py b/tests/topotests/high_ecmp/test_high_ecmp_unnumbered.py index 1a65a44a55..0947272386 100644 --- a/tests/topotests/high_ecmp/test_high_ecmp_unnumbered.py +++ b/tests/topotests/high_ecmp/test_high_ecmp_unnumbered.py @@ -78,13 +78,18 @@ def setup_module(module): (TopoRouter.RD_ZEBRA, "-s 180000000"), (TopoRouter.RD_BGP, None), (TopoRouter.RD_SHARP, None), + (TopoRouter.RD_STATIC, None), + (TopoRouter.RD_OSPF, None), + (TopoRouter.RD_OSPF6, None), + (TopoRouter.RD_PIM, None), ], ) - + tgen.start_router() - + for rname, router in router_list.items(): - router.cmd("vtysh -f {}/{}/frr_unnumbered_bgp.conf".format(CWD, rname)) + router.cmd("vtysh -f {}/{}/frr_unnumbered_bgp.conf".format(CWD, rname)) + def teardown_module(_mod): "Teardown the pytest environment" diff --git a/tests/topotests/high_ecmp/test_high_ecmp_v4.py b/tests/topotests/high_ecmp/test_high_ecmp_v4.py index c1fbb9e878..909c9d6528 100644 --- a/tests/topotests/high_ecmp/test_high_ecmp_v4.py +++ b/tests/topotests/high_ecmp/test_high_ecmp_v4.py @@ -78,13 +78,18 @@ def setup_module(module): (TopoRouter.RD_ZEBRA, "-s 180000000"), (TopoRouter.RD_BGP, None), (TopoRouter.RD_SHARP, None), + (TopoRouter.RD_STATIC, None), + (TopoRouter.RD_OSPF, None), + (TopoRouter.RD_OSPF6, None), + (TopoRouter.RD_PIM, None), ], ) - + tgen.start_router() - + for rname, router in router_list.items(): - router.cmd("vtysh -f {}/{}/frr_ipv4_bgp.conf".format(CWD, rname)) + router.cmd("vtysh -f {}/{}/frr_ipv4_bgp.conf".format(CWD, rname)) + def teardown_module(_mod): "Teardown the pytest environment" diff --git a/tests/topotests/high_ecmp/test_high_ecmp_v6.py b/tests/topotests/high_ecmp/test_high_ecmp_v6.py index a63444f0f0..cb4c93f590 100644 --- a/tests/topotests/high_ecmp/test_high_ecmp_v6.py +++ b/tests/topotests/high_ecmp/test_high_ecmp_v6.py @@ -78,13 +78,18 @@ def setup_module(module): (TopoRouter.RD_ZEBRA, "-s 180000000"), (TopoRouter.RD_BGP, None), (TopoRouter.RD_SHARP, None), + (TopoRouter.RD_STATIC, None), + (TopoRouter.RD_OSPF, None), + (TopoRouter.RD_OSPF6, None), + (TopoRouter.RD_PIM, None), ], ) - + tgen.start_router() - + for rname, router in router_list.items(): - router.cmd("vtysh -f {}/{}/frr_ipv6_bgp.conf".format(CWD, rname)) + router.cmd("vtysh -f {}/{}/frr_ipv6_bgp.conf".format(CWD, rname)) + def teardown_module(_mod): "Teardown the pytest environment" From 6fe9092eb312e196260ee8deefb73b3f864b1432 Mon Sep 17 00:00:00 2001 From: Soumya Roy Date: Fri, 14 Mar 2025 21:56:48 +0000 Subject: [PATCH 3/4] zebra: zebra crash for zapi stream Issue: If static route is created with a BGP route as nexthop, which recursively resolves over 512 ECMP v6 nexthops, zapi nexthop encode fails, as there is not enough memory allocated for stream. This causes assert/core dump in zebra. Right now we allocate fixed memory of ZEBRA_MAX_PACKET_SIZ size. Fix: 1)Dynamically calculate required memory size for the stream 2)try to optimize memory usage Testing: No crash happens anymore with the fix zebra: zebra crash for zapi stream Issue: If static route is created with a BGP route as nexthop, which recursively resolves over 512 ECMP v6 nexthops, zapi nexthop encode fails, as there is not enough memory allocated for stream. This causes assert/core dump in zebra. Right now we allocate fixed memory of ZEBRA_MAX_PACKET_SIZ size. Fix: 1)Dynamically calculate required memory size for the stream 2)try to optimize memory usage Testing: No crash happens anymore with the fix r1# r1# sharp install routes 2100:cafe:: nexthop 2001:db8::1 1000 r1# r2# conf r2(config)# ipv6 route 2503:feca::100/128 2100:cafe::1 r2(config)# exit r2# Signed-off-by: Soumya Roy --- zebra/zebra_rnh.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebra/zebra_rnh.c b/zebra/zebra_rnh.c index 640e6551a7..6b6be59c47 100644 --- a/zebra/zebra_rnh.c +++ b/zebra/zebra_rnh.c @@ -1150,7 +1150,7 @@ int zebra_send_rnh_update(struct rnh *rnh, struct zserv *client, re = rnh->state; /* Get output stream. */ - s = stream_new(ZEBRA_MAX_PACKET_SIZ); + s = stream_new_expandable(ZEBRA_MAX_PACKET_SIZ); zclient_create_header(s, ZEBRA_NEXTHOP_UPDATE, vrf_id); From 860c1e445043c6e380a9fb08cc0c21c6339625ad Mon Sep 17 00:00:00 2001 From: Soumya Roy Date: Fri, 14 Mar 2025 22:01:51 +0000 Subject: [PATCH 4/4] zebra: reduce memory usage by streams when redistributing routes This commit undo 8c9b007a0c7efb2e9afc2eac936ba9dd971c6707 stream lib has been modified to expand the stream if needed Now for zapi route encode, we use expandable stream Signed-off-by: Soumya Roy --- lib/zclient.c | 18 ------------------ lib/zclient.h | 1 - zebra/zapi_msg.c | 4 +--- 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/lib/zclient.c b/lib/zclient.c index f0476867be..532771cb93 100644 --- a/lib/zclient.c +++ b/lib/zclient.c @@ -1326,24 +1326,6 @@ enum zclient_send_status zclient_nhg_send(struct zclient *zclient, int cmd, return zclient_send_message(zclient); } -/* size needed by a stream for redistributing a route */ -int zapi_redistribute_stream_size(struct zapi_route *api) -{ - size_t msg_size = 0; - size_t nh_size = sizeof(struct zapi_nexthop); - - msg_size = sizeof(struct zapi_route); - /* remove unused nexthop structures */ - msg_size -= (MULTIPATH_NUM - api->nexthop_num) * nh_size; - /* remove unused backup nexthop structures */ - msg_size -= (MULTIPATH_NUM - api->backup_nexthop_num) * nh_size; - /* remove unused opaque values */ - msg_size -= ZAPI_MESSAGE_OPAQUE_LENGTH - api->opaque.length; - - return msg_size; -} - - int zapi_route_encode(uint8_t cmd, struct stream *s, struct zapi_route *api) { struct zapi_nexthop *api_nh; diff --git a/lib/zclient.h b/lib/zclient.h index 43521d6e2e..afd84acce2 100644 --- a/lib/zclient.h +++ b/lib/zclient.h @@ -1158,7 +1158,6 @@ zclient_send_rnh(struct zclient *zclient, int command, const struct prefix *p, vrf_id_t vrf_id); int zapi_nexthop_encode(struct stream *s, const struct zapi_nexthop *api_nh, uint32_t api_flags, uint32_t api_message); -extern int zapi_redistribute_stream_size(struct zapi_route *api); extern int zapi_route_encode(uint8_t, struct stream *, struct zapi_route *); extern int zapi_route_decode(struct stream *s, struct zapi_route *api); extern int zapi_nexthop_decode(struct stream *s, struct zapi_nexthop *api_nh, diff --git a/zebra/zapi_msg.c b/zebra/zapi_msg.c index 066859d380..9c8ef19a9d 100644 --- a/zebra/zapi_msg.c +++ b/zebra/zapi_msg.c @@ -518,7 +518,6 @@ int zsend_redistribute_route(int cmd, struct zserv *client, const struct route_n const struct prefix *p, *src_p; uint16_t count = 0; afi_t afi; - size_t stream_size = 0; srcdest_rnode_prefixes(rn, &p, &src_p); memset(&api, 0, sizeof(api)); @@ -610,8 +609,7 @@ int zsend_redistribute_route(int cmd, struct zserv *client, const struct route_n SET_FLAG(api.message, ZAPI_MESSAGE_MTU); api.mtu = re->mtu; - stream_size = zapi_redistribute_stream_size(&api); - struct stream *s = stream_new(stream_size); + struct stream *s = stream_new_expandable(ZEBRA_MAX_PACKET_SIZ); /* Encode route and send. */ if (zapi_route_encode(cmd, s, &api) < 0) {