Skip to content

Commit 3da0766

Browse files
ShelbyZedsiper
authored andcommitted
aws: Implement simple_aggregation operation
- Add helper methods to support aggregating records for aws outputs Signed-off-by: Shelby Hagman <shelbyzh@amazon.com>
1 parent c994082 commit 3da0766

3 files changed

Lines changed: 325 additions & 0 deletions

File tree

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2015-2025 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLB_AWS_AGGREGATION_H
21+
#define FLB_AWS_AGGREGATION_H
22+
23+
#include <fluent-bit/flb_output_plugin.h>
24+
#include <fluent-bit/flb_time.h>
25+
26+
/* Aggregation buffer structure */
27+
struct flb_aws_agg_buffer {
28+
char *agg_buf; /* aggregated records buffer */
29+
size_t agg_buf_size; /* total size of aggregation buffer */
30+
size_t agg_buf_offset; /* current offset in aggregation buffer */
31+
};
32+
33+
/* Initialize aggregation buffer
34+
* Returns:
35+
* 0 = success
36+
* -1 = error
37+
*/
38+
int flb_aws_aggregation_init(struct flb_aws_agg_buffer *buf, size_t max_record_size);
39+
40+
/* Destroy aggregation buffer */
41+
void flb_aws_aggregation_destroy(struct flb_aws_agg_buffer *buf);
42+
43+
/* Try to add event data to aggregation buffer
44+
* Returns:
45+
* 0 = success, event added to aggregation buffer
46+
* 1 = buffer full, caller should finalize and retry
47+
*/
48+
int flb_aws_aggregation_add(struct flb_aws_agg_buffer *buf,
49+
const char *data, size_t data_len,
50+
size_t max_record_size);
51+
52+
/* Finalize aggregated record
53+
* Returns:
54+
* 0 = success
55+
* -1 = error (no data to finalize)
56+
*
57+
* Output is written to buf->agg_buf and the size is returned via out_size parameter
58+
*/
59+
int flb_aws_aggregation_finalize(struct flb_aws_agg_buffer *buf,
60+
int add_final_newline,
61+
size_t *out_size);
62+
63+
/* Reset aggregation buffer for reuse */
64+
void flb_aws_aggregation_reset(struct flb_aws_agg_buffer *buf);
65+
66+
/* Process event with simple aggregation
67+
* Converts msgpack to JSON, optionally adds log_key and time_key,
68+
* then adds to aggregation buffer
69+
*
70+
* Returns:
71+
* -1 = failure, record not added
72+
* 0 = success, record added
73+
* 1 = buffer full, caller should finalize and retry
74+
* 2 = record could not be processed, discard it
75+
*/
76+
int flb_aws_aggregation_process_event(struct flb_aws_agg_buffer *agg_buf,
77+
char *tmp_buf,
78+
size_t tmp_buf_size,
79+
size_t *tmp_buf_offset,
80+
const msgpack_object *obj,
81+
struct flb_time *tms,
82+
struct flb_config *config,
83+
struct flb_output_instance *ins,
84+
const char *stream_name,
85+
const char *log_key,
86+
const char *time_key,
87+
const char *time_key_format,
88+
size_t max_event_size);
89+
90+
#endif

src/aws/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ set(src
1515
"flb_aws_imds.c"
1616
"flb_aws_credentials_http.c"
1717
"flb_aws_credentials_profile.c"
18+
"flb_aws_aggregation.c"
1819
)
1920

2021
message(STATUS "=== AWS Credentials ===")

src/aws/flb_aws_aggregation.c

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2015-2025 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#include <fluent-bit/flb_mem.h>
21+
#include <fluent-bit/flb_log.h>
22+
#include <fluent-bit/flb_pack.h>
23+
#include <fluent-bit/flb_time.h>
24+
#include <fluent-bit/flb_aws_util.h>
25+
#include <fluent-bit/aws/flb_aws_aggregation.h>
26+
27+
#include <string.h>
28+
#include <time.h>
29+
30+
int flb_aws_aggregation_init(struct flb_aws_agg_buffer *buf, size_t max_record_size)
31+
{
32+
if (!buf) {
33+
return -1;
34+
}
35+
36+
buf->agg_buf = flb_malloc(max_record_size);
37+
if (!buf->agg_buf) {
38+
flb_errno();
39+
return -1;
40+
}
41+
42+
buf->agg_buf_size = max_record_size;
43+
buf->agg_buf_offset = 0;
44+
45+
return 0;
46+
}
47+
48+
void flb_aws_aggregation_destroy(struct flb_aws_agg_buffer *buf)
49+
{
50+
if (buf && buf->agg_buf) {
51+
flb_free(buf->agg_buf);
52+
buf->agg_buf = NULL;
53+
buf->agg_buf_size = 0;
54+
buf->agg_buf_offset = 0;
55+
}
56+
}
57+
58+
int flb_aws_aggregation_add(struct flb_aws_agg_buffer *buf,
59+
const char *data, size_t data_len,
60+
size_t max_record_size)
61+
{
62+
if (!buf || !data || data_len == 0) {
63+
return -1;
64+
}
65+
66+
/* Check if adding this data would exceed the max record size */
67+
if (buf->agg_buf_offset + data_len > max_record_size) {
68+
/* Buffer full, caller should finalize and retry */
69+
return 1;
70+
}
71+
72+
/* Add data to aggregation buffer */
73+
memcpy(buf->agg_buf + buf->agg_buf_offset, data, data_len);
74+
buf->agg_buf_offset += data_len;
75+
76+
return 0;
77+
}
78+
79+
int flb_aws_aggregation_finalize(struct flb_aws_agg_buffer *buf,
80+
int add_final_newline,
81+
size_t *out_size)
82+
{
83+
if (!buf || !out_size) {
84+
return -1;
85+
}
86+
87+
/* Check if there's any data to finalize */
88+
if (buf->agg_buf_offset == 0) {
89+
return -1;
90+
}
91+
92+
/* Add final newline if requested (for Firehose) */
93+
if (add_final_newline && buf->agg_buf_offset < buf->agg_buf_size) {
94+
buf->agg_buf[buf->agg_buf_offset] = '\n';
95+
buf->agg_buf_offset++;
96+
}
97+
98+
*out_size = buf->agg_buf_offset;
99+
return 0;
100+
}
101+
102+
void flb_aws_aggregation_reset(struct flb_aws_agg_buffer *buf)
103+
{
104+
if (buf) {
105+
buf->agg_buf_offset = 0;
106+
}
107+
}
108+
109+
/*
110+
* Process event with simple aggregation
111+
* Shared implementation for Kinesis Streams and Firehose
112+
*/
113+
int flb_aws_aggregation_process_event(struct flb_aws_agg_buffer *agg_buf,
114+
char *tmp_buf,
115+
size_t tmp_buf_size,
116+
size_t *tmp_buf_offset,
117+
const msgpack_object *obj,
118+
struct flb_time *tms,
119+
struct flb_config *config,
120+
struct flb_output_instance *ins,
121+
const char *stream_name,
122+
const char *log_key,
123+
const char *time_key,
124+
const char *time_key_format,
125+
size_t max_event_size)
126+
{
127+
size_t written = 0;
128+
int ret;
129+
char *tmp_buf_ptr;
130+
char *time_key_ptr;
131+
struct tm time_stamp;
132+
struct tm *tmp;
133+
size_t len;
134+
size_t tmp_size;
135+
char *out_buf;
136+
137+
tmp_buf_ptr = tmp_buf + *tmp_buf_offset;
138+
ret = flb_msgpack_to_json(tmp_buf_ptr,
139+
tmp_buf_size - *tmp_buf_offset,
140+
obj, config->json_escape_unicode);
141+
if (ret <= 0) {
142+
return 1;
143+
}
144+
written = (size_t) ret;
145+
146+
/* Discard empty messages */
147+
if (written <= 2) {
148+
flb_plg_debug(ins, "Found empty log message, %s", stream_name);
149+
return 2;
150+
}
151+
152+
if (log_key) {
153+
written -= 2;
154+
tmp_buf_ptr++;
155+
(*tmp_buf_offset)++;
156+
}
157+
158+
if ((written + 1) >= max_event_size) {
159+
flb_plg_warn(ins, "[size=%zu] Discarding record which is larger than "
160+
"max size allowed, %s", written + 1, stream_name);
161+
return 2;
162+
}
163+
164+
if (time_key) {
165+
tmp = gmtime_r(&tms->tm.tv_sec, &time_stamp);
166+
if (!tmp) {
167+
flb_plg_error(ins, "Could not create time stamp for %lu unix "
168+
"seconds, discarding record, %s", tms->tm.tv_sec, stream_name);
169+
return 2;
170+
}
171+
172+
len = flb_aws_strftime_precision(&out_buf, time_key_format, tms);
173+
tmp_size = (tmp_buf_size - *tmp_buf_offset) - written;
174+
if (len > tmp_size) {
175+
flb_free(out_buf);
176+
return 1;
177+
}
178+
179+
if (len == 0) {
180+
flb_plg_error(ins, "Failed to add time_key %s to record, %s",
181+
time_key, stream_name);
182+
flb_free(out_buf);
183+
return 2;
184+
}
185+
else {
186+
time_key_ptr = tmp_buf_ptr + written - 1;
187+
memcpy(time_key_ptr, ",", 1);
188+
time_key_ptr++;
189+
memcpy(time_key_ptr, "\"", 1);
190+
time_key_ptr++;
191+
memcpy(time_key_ptr, time_key, strlen(time_key));
192+
time_key_ptr += strlen(time_key);
193+
memcpy(time_key_ptr, "\":\"", 3);
194+
time_key_ptr += 3;
195+
196+
memcpy(time_key_ptr, out_buf, len);
197+
flb_free(out_buf);
198+
time_key_ptr += len;
199+
memcpy(time_key_ptr, "\"}", 2);
200+
time_key_ptr += 2;
201+
written = (time_key_ptr - tmp_buf_ptr);
202+
}
203+
}
204+
205+
if ((written + 1) >= max_event_size) {
206+
flb_plg_warn(ins, "[size=%zu] Discarding record which is larger than "
207+
"max size allowed, %s", written + 1, stream_name);
208+
return 2;
209+
}
210+
211+
/* Append newline */
212+
tmp_size = (tmp_buf_size - *tmp_buf_offset) - written;
213+
if (tmp_size <= 1) {
214+
return 1;
215+
}
216+
217+
memcpy(tmp_buf_ptr + written, "\n", 1);
218+
written++;
219+
220+
/* Try to add to aggregation buffer */
221+
tmp_buf_ptr = tmp_buf + *tmp_buf_offset;
222+
ret = flb_aws_aggregation_add(agg_buf, tmp_buf_ptr, written, max_event_size);
223+
224+
if (ret == 1) {
225+
return 1;
226+
}
227+
else if (ret < 0) {
228+
flb_plg_error(ins, "Failed to add record to aggregation buffer");
229+
return -1;
230+
}
231+
232+
*tmp_buf_offset += written;
233+
return 0;
234+
}

0 commit comments

Comments
 (0)