Skip to content

Commit 976938a

Browse files
ShelbyZedsiper
authored andcommitted
out_kinesis_streams: Add simple_aggregation operation
- Add simple_aggregation config parameter and implementation to plugin Signed-off-by: Shelby Hagman <shelbyzh@amazon.com>
1 parent 94c4b50 commit 976938a

3 files changed

Lines changed: 169 additions & 13 deletions

File tree

plugins/out_kinesis_streams/kinesis.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,10 @@ static int cb_kinesis_init(struct flb_output_instance *ins,
308308
return -1;
309309
}
310310

311-
static struct flush *new_flush_buffer(const char *tag, int tag_len)
311+
static struct flush *new_flush_buffer(struct flb_kinesis *ctx, const char *tag, int tag_len)
312312
{
313313
struct flush *buf;
314-
314+
int ret;
315315

316316
buf = flb_calloc(1, sizeof(struct flush));
317317
if (!buf) {
@@ -338,6 +338,18 @@ static struct flush *new_flush_buffer(const char *tag, int tag_len)
338338
buf->tag = tag;
339339
buf->tag_len = tag_len;
340340

341+
/* Initialize aggregation buffer if simple_aggregation is enabled */
342+
buf->agg_buf_initialized = FLB_FALSE;
343+
if (ctx->simple_aggregation) {
344+
ret = flb_aws_aggregation_init(&buf->agg_buf, MAX_EVENT_SIZE);
345+
if (ret < 0) {
346+
flb_plg_error(ctx->ins, "Failed to initialize aggregation buffer");
347+
kinesis_flush_destroy(buf);
348+
return NULL;
349+
}
350+
buf->agg_buf_initialized = FLB_TRUE;
351+
}
352+
341353
return buf;
342354
}
343355

@@ -353,7 +365,7 @@ static void cb_kinesis_flush(struct flb_event_chunk *event_chunk,
353365
(void) i_ins;
354366
(void) config;
355367

356-
buf = new_flush_buffer(event_chunk->tag, flb_sds_len(event_chunk->tag));
368+
buf = new_flush_buffer(ctx, event_chunk->tag, flb_sds_len(event_chunk->tag));
357369
if (!buf) {
358370
flb_plg_error(ctx->ins, "Failed to construct flush buffer");
359371
FLB_OUTPUT_RETURN(FLB_RETRY);
@@ -503,6 +515,13 @@ static struct flb_config_map config_map[] = {
503515
"$HOME/.aws/ directory."
504516
},
505517

518+
{
519+
FLB_CONFIG_MAP_BOOL, "simple_aggregation", "false",
520+
0, FLB_TRUE, offsetof(struct flb_kinesis, simple_aggregation),
521+
"Enable simple aggregation to combine multiple records into single API calls. "
522+
"This reduces the number of requests and can improve throughput."
523+
},
524+
506525
/* EOF */
507526
{0}
508527
};

plugins/out_kinesis_streams/kinesis.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <fluent-bit/flb_http_client.h>
2727
#include <fluent-bit/flb_aws_util.h>
2828
#include <fluent-bit/flb_signv4.h>
29+
#include <fluent-bit/aws/flb_aws_aggregation.h>
2930

3031
#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"
3132

@@ -56,6 +57,10 @@ struct flush {
5657
char *event_buf;
5758
size_t event_buf_size;
5859

60+
/* aggregation buffer for simple_aggregation mode */
61+
struct flb_aws_agg_buffer agg_buf;
62+
int agg_buf_initialized;
63+
5964
int records_sent;
6065
int records_processed;
6166

@@ -92,6 +97,7 @@ struct flb_kinesis {
9297
const char *log_key;
9398
const char *external_id;
9499
int retry_requests;
100+
int simple_aggregation;
95101
char *sts_endpoint;
96102
int custom_endpoint;
97103
uint16_t port;

plugins/out_kinesis_streams/kinesis_api.c

Lines changed: 141 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include <fluent-bit/flb_http_client.h>
3737
#include <fluent-bit/flb_utils.h>
3838
#include <fluent-bit/flb_base64.h>
39+
#include <fluent-bit/aws/flb_aws_aggregation.h>
3940

4041
#include <monkey/mk_core.h>
4142
#include <msgpack.h>
@@ -51,6 +52,9 @@
5152

5253
#define ERR_CODE_EXCEEDED_THROUGHPUT "ProvisionedThroughputExceededException"
5354

55+
/* Forward declarations */
56+
static int send_log_events(struct flb_kinesis *ctx, struct flush *buf);
57+
5458
static struct flb_aws_header put_records_target_header = {
5559
.key = "X-Amz-Target",
5660
.key_len = 12,
@@ -202,6 +206,29 @@ static int end_put_payload(struct flb_kinesis *ctx, struct flush *buf,
202206
}
203207

204208

209+
/*
210+
* Process event with simple aggregation (Kinesis Streams version)
211+
* Uses shared aggregation implementation
212+
*/
213+
static int process_event_simple_aggregation(struct flb_kinesis *ctx, struct flush *buf,
214+
const msgpack_object *obj, struct flb_time *tms,
215+
struct flb_config *config)
216+
{
217+
return flb_aws_aggregation_process_event(&buf->agg_buf,
218+
buf->tmp_buf,
219+
buf->tmp_buf_size,
220+
&buf->tmp_buf_offset,
221+
obj,
222+
tms,
223+
config,
224+
ctx->ins,
225+
ctx->stream_name,
226+
ctx->log_key,
227+
ctx->time_key,
228+
ctx->time_key_format,
229+
MAX_EVENT_SIZE);
230+
}
231+
205232
/*
206233
* Processes the msgpack object
207234
* -1 = failure, record not added
@@ -391,6 +418,70 @@ static void reset_flush_buf(struct flb_kinesis *ctx, struct flush *buf) {
391418
buf->data_size += strlen(ctx->stream_name);
392419
}
393420

421+
/* Finalize and send aggregated record (Kinesis Streams version - no final newline) */
422+
static int send_aggregated_record(struct flb_kinesis *ctx, struct flush *buf) {
423+
int ret;
424+
size_t agg_size;
425+
size_t b64_len;
426+
struct kinesis_event *event;
427+
428+
/* Finalize without final newline (Kinesis Streams doesn't need it) */
429+
ret = flb_aws_aggregation_finalize(&buf->agg_buf, 0, &agg_size);
430+
if (ret < 0) {
431+
return 0;
432+
}
433+
434+
/* Base64 encode the aggregated record */
435+
size_t size = (agg_size * 1.5) + 4;
436+
if (buf->event_buf == NULL || buf->event_buf_size < size) {
437+
flb_free(buf->event_buf);
438+
buf->event_buf = flb_malloc(size);
439+
buf->event_buf_size = size;
440+
if (buf->event_buf == NULL) {
441+
flb_errno();
442+
return -1;
443+
}
444+
}
445+
446+
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
447+
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
448+
if (ret != 0) {
449+
flb_errno();
450+
return -1;
451+
}
452+
agg_size = b64_len;
453+
454+
/* Copy to tmp_buf */
455+
if (buf->tmp_buf_size < agg_size) {
456+
flb_plg_error(ctx->ins, "Aggregated record too large for buffer");
457+
flb_aws_aggregation_reset(&buf->agg_buf);
458+
return 0;
459+
}
460+
461+
memcpy(buf->tmp_buf, buf->event_buf, agg_size);
462+
463+
/* Create event record */
464+
event = &buf->events[0];
465+
event->json = buf->tmp_buf;
466+
event->len = agg_size;
467+
event->timestamp.tv_sec = 0;
468+
event->timestamp.tv_nsec = 0;
469+
buf->event_index = 1;
470+
471+
/* Calculate data_size for the payload */
472+
buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN;
473+
buf->data_size += strlen(ctx->stream_name);
474+
buf->data_size += agg_size + PUT_RECORDS_PER_RECORD_LEN;
475+
476+
/* Send the aggregated record */
477+
ret = send_log_events(ctx, buf);
478+
479+
/* Reset aggregation buffer */
480+
flb_aws_aggregation_reset(&buf->agg_buf);
481+
482+
return ret;
483+
}
484+
394485
/* constructs a put payload, and then sends */
395486
static int send_log_events(struct flb_kinesis *ctx, struct flush *buf) {
396487
int ret;
@@ -476,10 +567,48 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf,
476567
size_t event_bytes = 0;
477568

478569
if (buf->event_index == 0) {
479-
/* init */
480570
reset_flush_buf(ctx, buf);
481571
}
482572

573+
/* Use simple aggregation if enabled */
574+
if (ctx->simple_aggregation) {
575+
retry_add_event_agg:
576+
retry_add = FLB_FALSE;
577+
ret = process_event_simple_aggregation(ctx, buf, obj, tms, config);
578+
if (ret < 0) {
579+
return -1;
580+
}
581+
else if (ret == 1) {
582+
/* Buffer full - check if buffer was empty before sending (record too large) */
583+
if (buf->agg_buf.agg_buf_offset == 0) {
584+
flb_plg_warn(ctx->ins, "Discarding unprocessable record (too large for aggregation buffer), %s",
585+
ctx->stream_name);
586+
reset_flush_buf(ctx, buf);
587+
return 0;
588+
}
589+
590+
/* Send aggregated record and retry */
591+
ret = send_aggregated_record(ctx, buf);
592+
reset_flush_buf(ctx, buf);
593+
if (ret < 0) {
594+
return -1;
595+
}
596+
retry_add = FLB_TRUE;
597+
}
598+
else if (ret == 2) {
599+
flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s",
600+
ctx->stream_name);
601+
return 0;
602+
}
603+
604+
if (retry_add == FLB_TRUE) {
605+
goto retry_add_event_agg;
606+
}
607+
608+
return 0;
609+
}
610+
611+
/* Normal processing without aggregation */
483612
retry_add_event:
484613
retry_add = FLB_FALSE;
485614
ret = process_event(ctx, buf, obj, tms, config);
@@ -488,16 +617,13 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf,
488617
}
489618
else if (ret == 1) {
490619
if (buf->event_index <= 0) {
491-
/* somehow the record was larger than our entire request buffer */
492620
flb_plg_warn(ctx->ins, "Discarding massive log record, %s",
493621
ctx->stream_name);
494-
return 0; /* discard this record and return to caller */
622+
return 0;
495623
}
496-
/* send logs and then retry the add */
497624
retry_add = FLB_TRUE;
498625
goto send;
499626
} else if (ret == 2) {
500-
/* discard this record and return to caller */
501627
flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s",
502628
ctx->stream_name);
503629
return 0;
@@ -508,17 +634,14 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf,
508634

509635
if ((buf->data_size + event_bytes) > PUT_RECORDS_PAYLOAD_SIZE) {
510636
if (buf->event_index <= 0) {
511-
/* somehow the record was larger than our entire request buffer */
512637
flb_plg_warn(ctx->ins, "[size=%zu] Discarding massive log record, %s",
513638
event_bytes, ctx->stream_name);
514-
return 0; /* discard this record and return to caller */
639+
return 0;
515640
}
516-
/* do not send this event */
517641
retry_add = FLB_TRUE;
518642
goto send;
519643
}
520644

521-
/* send is not needed yet, return to caller */
522645
buf->data_size += event_bytes;
523646
buf->event_index++;
524647

@@ -633,7 +756,12 @@ int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf,
633756
flb_log_event_decoder_destroy(&log_decoder);
634757

635758
/* send any remaining events */
636-
ret = send_log_events(ctx, buf);
759+
if (ctx->simple_aggregation) {
760+
ret = send_aggregated_record(ctx, buf);
761+
}
762+
else {
763+
ret = send_log_events(ctx, buf);
764+
}
637765
reset_flush_buf(ctx, buf);
638766
if (ret < 0) {
639767
return -1;
@@ -981,6 +1109,9 @@ int put_records(struct flb_kinesis *ctx, struct flush *buf,
9811109
void kinesis_flush_destroy(struct flush *buf)
9821110
{
9831111
if (buf) {
1112+
if (buf->agg_buf_initialized) {
1113+
flb_aws_aggregation_destroy(&buf->agg_buf);
1114+
}
9841115
flb_free(buf->tmp_buf);
9851116
flb_free(buf->out_buf);
9861117
flb_free(buf->events);

0 commit comments

Comments
 (0)