lib: Add support for stream buffer to expand

Issue:
 Currently, during encode time, if required memory is
 more than available space in stream buffer, stream buffer
 can't be expanded. This fix introduces new apis to support
 stream buffer expansion.

 Testing:
 Tested with zebra nexthop encoding with 512 nexthops, which triggers
 this new code changes, it works fine. Without fix, for same trigger
 it asserts.

 Signed-off-by: Soumya Roy <souroy@nvidia.com>
This commit is contained in:
Soumya Roy 2025-03-14 21:44:39 +00:00
parent 8f8d0923a7
commit c0c46bad15
2 changed files with 120 additions and 26 deletions

View file

@ -16,6 +16,11 @@
#include "frr_pthread.h" #include "frr_pthread.h"
#include "lib_errors.h" #include "lib_errors.h"
#define MIN_STREAM_EXPANSION_SZ 512
/* Extra size needed for a stream in bytes, given new write size */
#define STREAM_EXPAND_SIZE(S, WSZ) ((WSZ)-STREAM_WRITEABLE(S))
DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream"); DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream");
DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO"); DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO");
@ -92,11 +97,20 @@ struct stream *stream_new(size_t size)
assert(size > 0); assert(size > 0);
s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size); s = XMALLOC(MTYPE_STREAM, sizeof(struct stream));
s->data = XMALLOC(MTYPE_STREAM, size);
s->getp = s->endp = 0; s->getp = s->endp = 0;
s->next = NULL; s->next = NULL;
s->size = size; s->size = size;
s->allow_expansion = false;
return s;
}
struct stream *stream_new_expandable(size_t size)
{
struct stream *s = stream_new(size);
s->allow_expansion = true;
return s; return s;
} }
@ -106,6 +120,7 @@ void stream_free(struct stream *s)
if (!s) if (!s)
return; return;
XFREE(MTYPE_STREAM, s->data);
XFREE(MTYPE_STREAM, s); XFREE(MTYPE_STREAM, s);
} }
@ -115,7 +130,7 @@ struct stream *stream_copy(struct stream *dest, const struct stream *src)
assert(dest != NULL); assert(dest != NULL);
assert(STREAM_SIZE(dest) >= src->endp); assert(STREAM_SIZE(dest) >= src->endp);
dest->allow_expansion = src->allow_expansion;
dest->endp = src->endp; dest->endp = src->endp;
dest->getp = src->getp; dest->getp = src->getp;
@ -131,7 +146,7 @@ struct stream *stream_dup(const struct stream *s)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
snew = stream_new(s->endp); snew = stream_new(s->endp);
snew->allow_expansion = s->allow_expansion;
return (stream_copy(snew, s)); return (stream_copy(snew, s));
} }
@ -143,9 +158,16 @@ struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
STREAM_VERIFY_SANE(s1); STREAM_VERIFY_SANE(s1);
STREAM_VERIFY_SANE(s2); STREAM_VERIFY_SANE(s2);
if (offset > s1->endp) {
fprintf(stderr, "Error: Invalid offset %zu, exceeds s1->endp %zu\n", offset,
s1->endp);
return NULL;
}
if ((new = stream_new(s1->endp + s2->endp)) == NULL) if ((new = stream_new(s1->endp + s2->endp)) == NULL)
return NULL; return NULL;
new->allow_expansion = s1->allow_expansion || s2->allow_expansion;
memcpy(new->data, s1->data, offset); memcpy(new->data, s1->data, offset);
memcpy(new->data + offset, s2->data, s2->endp); memcpy(new->data + offset, s2->data, s2->endp);
memcpy(new->data + offset + s2->endp, s1->data + offset, memcpy(new->data + offset + s2->endp, s1->data + offset,
@ -160,7 +182,7 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
STREAM_VERIFY_SANE(orig); STREAM_VERIFY_SANE(orig);
orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize); orig->data = XREALLOC(MTYPE_STREAM, orig->data, newsize);
orig->size = newsize; orig->size = newsize;
@ -175,6 +197,29 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
return orig->size; return orig->size;
} }
/* Helper function to expand stream if needed and allowed */
static void stream_expand(struct stream *s, size_t expand_size)
{
size_t new_size;
size_t actual_expand_size = expand_size;
/* Growth strategy:
* For small expansions (<= min expansion bytes): grow by min size
* otherwise grow by needed size
*/
if (actual_expand_size <= MIN_STREAM_EXPANSION_SZ) {
actual_expand_size = MIN_STREAM_EXPANSION_SZ;
}
/* Calculate new total size */
new_size = s->size + actual_expand_size;
/* Reallocate the data buffer */
s->data = XREALLOC(MTYPE_STREAM, s->data, new_size);
/* Update the stream's data size */
s->size = new_size;
}
size_t stream_get_getp(const struct stream *s) size_t stream_get_getp(const struct stream *s)
{ {
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
@ -691,8 +736,12 @@ void stream_put(struct stream *s, const void *src, size_t size)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < size) { if (STREAM_WRITEABLE(s) < size) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return; stream_expand(s, STREAM_EXPAND_SIZE(s, size));
} else {
STREAM_BOUND_WARN(s, "put");
return;
}
} }
if (src) if (src)
@ -709,8 +758,12 @@ int stream_putc(struct stream *s, uint8_t c)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) { if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint8_t)));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
s->data[s->endp++] = c; s->data[s->endp++] = c;
@ -723,8 +776,12 @@ int stream_putw(struct stream *s, uint16_t w)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) { if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint16_t)));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
s->data[s->endp++] = (uint8_t)(w >> 8); s->data[s->endp++] = (uint8_t)(w >> 8);
@ -739,8 +796,12 @@ int stream_put3(struct stream *s, uint32_t l)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < 3) { if (STREAM_WRITEABLE(s) < 3) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, 3));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
s->data[s->endp++] = (uint8_t)(l >> 16); s->data[s->endp++] = (uint8_t)(l >> 16);
@ -756,8 +817,12 @@ int stream_putl(struct stream *s, uint32_t l)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t)));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
s->data[s->endp++] = (uint8_t)(l >> 24); s->data[s->endp++] = (uint8_t)(l >> 24);
@ -774,8 +839,12 @@ int stream_putq(struct stream *s, uint64_t q)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) { if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
STREAM_BOUND_WARN(s, "put quad"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint64_t)));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
s->data[s->endp++] = (uint8_t)(q >> 56); s->data[s->endp++] = (uint8_t)(q >> 56);
@ -896,7 +965,13 @@ int stream_put_ipv4(struct stream *s, uint32_t l)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t)));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
return 0; return 0;
} }
memcpy(s->data + s->endp, &l, sizeof(uint32_t)); memcpy(s->data + s->endp, &l, sizeof(uint32_t));
@ -911,8 +986,12 @@ int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t)));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
memcpy(s->data + s->endp, addr, sizeof(uint32_t)); memcpy(s->data + s->endp, addr, sizeof(uint32_t));
@ -989,8 +1068,13 @@ int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
psize_with_addpath = psize; psize_with_addpath = psize;
if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) { if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s,
STREAM_EXPAND_SIZE(s, psize_with_addpath + sizeof(uint8_t)));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
if (addpath_capable) { if (addpath_capable) {
@ -1027,8 +1111,12 @@ int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
psize_with_addpath = psize + (addpath_capable ? 4 : 0); psize_with_addpath = psize + (addpath_capable ? 4 : 0);
if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) { if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, psize_with_addpath + 3));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
if (addpath_capable) { if (addpath_capable) {
@ -1167,8 +1255,12 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size)
STREAM_VERIFY_SANE(s); STREAM_VERIFY_SANE(s);
if (STREAM_WRITEABLE(s) < size) { if (STREAM_WRITEABLE(s) < size) {
STREAM_BOUND_WARN(s, "put"); if (s->allow_expansion) {
return 0; stream_expand(s, STREAM_EXPAND_SIZE(s, size));
} else {
STREAM_BOUND_WARN(s, "put");
return 0;
}
} }
memcpy(s->data + s->endp, ptr, size); memcpy(s->data + s->endp, ptr, size);

View file

@ -95,7 +95,8 @@ struct stream {
size_t getp; /* next get position */ size_t getp; /* next get position */
size_t endp; /* last valid data position */ size_t endp; /* last valid data position */
size_t size; /* size of data segment */ size_t size; /* size of data segment */
unsigned char data[]; /* data pointer */ bool allow_expansion; /* whether stream can be expanded */
unsigned char *data; /* data pointer */
}; };
/* First in first out queue structure. */ /* First in first out queue structure. */
@ -132,6 +133,7 @@ struct stream_fifo {
* q: quad (four words) * q: quad (four words)
*/ */
extern struct stream *stream_new(size_t); extern struct stream *stream_new(size_t);
extern struct stream *stream_new_expandable(size_t);
extern void stream_free(struct stream *); extern void stream_free(struct stream *);
/* Copy 'src' into 'dest', returns 'dest' */ /* Copy 'src' into 'dest', returns 'dest' */
extern struct stream *stream_copy(struct stream *dest, extern struct stream *stream_copy(struct stream *dest,