Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ jobs:

- name: ${{ matrix.compiler.cc }} & ${{ matrix.compiler.cxx }} - ${{ matrix.flb_option }}
run: |
if [[ "$FLB_OPT" =~ COVERAGE ]]; then
# Coverage build requires larger coroutine stack.
if [[ "$FLB_OPT" =~ COVERAGE ||
"$FLB_OPT" =~ SANITIZE_ADDRESS ||

@patrick-stephens patrick-stephens Apr 30, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably can just do a check for == FLB_SANITIZE_* rather than use a regex compare with a specific string - or use the regex compare to just check if it contains?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SEGV issue which seems to be run out coroutine memory just occurring only for SANITIZE_ADDRESS and SANITIZE_UNDEFINED so gating for FLB_SANITIZE_* is a bit of larger condition than we needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, in which case probably use a direct comparison rather than regex as just looks bit weird to me but fine.

"$FLB_OPT" =~ SANITIZE_UNDEFINED ]]; then
# Coverage and sanitizer builds require larger coroutine stack.
export FLB_OPT="${FLB_OPT} -DFLB_CORO_STACK_SIZE=4194304"
fi
echo "CC = $CC, CXX = $CXX, FLB_OPT = $FLB_OPT"
Expand Down
109 changes: 94 additions & 15 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <fluent-bit/flb_slist.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_opentelemetry.h>
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_aws_util.h>
#include <fluent-bit/aws/flb_aws_compress.h>
Expand Down Expand Up @@ -84,6 +85,10 @@ static void remove_from_queue(struct upload_queue *entry);

static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context);

static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx,
struct flb_event_chunk *event_chunk,
struct flb_config *config);

static struct flb_aws_header *get_content_encoding_header(int compression_type)
{
static struct flb_aws_header gzip_header = {
Expand Down Expand Up @@ -649,6 +654,7 @@ static int cb_s3_init(struct flb_output_instance *ins,

ctx->retry_time = 0;
ctx->upload_queue_success = FLB_FALSE;
ctx->out_format = FLB_PACK_JSON_FORMAT_LINES;

/*
* The engine default retry_limit (1) is too low for S3's internal
Expand Down Expand Up @@ -684,6 +690,37 @@ static int cb_s3_init(struct flb_output_instance *ins,
"S3 has its own buffer files located in the store_dir.");
}

/* Format key */
tmp = flb_output_get_property("format", ins);
if (tmp) {
ret = flb_pack_to_json_format_type(tmp);
if (ret == -1) {
flb_plg_error(ctx->ins, "invalid format '%s'", tmp);
return -1;
}

if (ret == FLB_PACK_JSON_FORMAT_JSON) {
flb_plg_warn(ctx->ins,
"'json' format is implicitly interpreted as 'json_lines' before."
"Now interpreted as 'json_lines' explicitly now");
ret = FLB_PACK_JSON_FORMAT_LINES;
}
else if (ret != FLB_PACK_JSON_FORMAT_LINES &&
ret != FLB_PACK_JSON_FORMAT_OTLP) {
flb_plg_error(ctx->ins, "unsupported format '%s'", tmp);
return -1;
}
ctx->out_format = ret;

if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP &&
ctx->log_key != NULL) {
flb_plg_error(ctx->ins,
"'log_key' is not supported when format is "
"otlp_json or otlp_json_pretty");
return -1;
}
}
Comment thread
cosmo0920 marked this conversation as resolved.

/* Date key */
ctx->date_key = ctx->json_date_key;
tmp = flb_output_get_property("json_date_key", ins);
Expand Down Expand Up @@ -1384,7 +1421,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
return FLB_OK;
}


/*
* Attempts to send all chunks to S3 using PutObject
* Used on shut down to try to send all buffered data
Expand Down Expand Up @@ -3765,6 +3801,57 @@ static int s3_timer_create(struct flb_s3 *ctx)
return 0;
}

static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx,
struct flb_event_chunk *event_chunk,
struct flb_config *config)
{
int result;
flb_sds_t payload;
static const char *default_logs_body_keys[] = {"log", "message"};
struct flb_opentelemetry_otlp_logs_options options;

if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP) {
if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
memset(&options, 0, sizeof(options));
options.logs_require_otel_metadata = FLB_FALSE;
options.logs_body_keys = default_logs_body_keys;
options.logs_body_key_count = 2;
options.logs_body_key_attributes = FLB_FALSE;

payload = flb_opentelemetry_logs_to_otlp_json(event_chunk->data,
event_chunk->size,
&options,
&result);
}
else {
return NULL;
}

if (payload == NULL) {
flb_plg_error(ctx->ins,
"could not convert event chunk to OTLP JSON: %d",
result);
return NULL;
}

return payload;
}

if (ctx->log_key) {
return flb_pack_msgpack_extract_log_key(ctx,
event_chunk->data,
event_chunk->size,
config);
}

return flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_LINES,
ctx->json_date_format,
ctx->date_key,
config->json_escape_unicode);
}

static void cb_s3_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
Expand Down Expand Up @@ -3800,20 +3887,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
flush_init(ctx);

/* Process chunk */
if (ctx->log_key) {
chunk = flb_pack_msgpack_extract_log_key(ctx,
event_chunk->data,
event_chunk->size,
config);
}
else {
chunk = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_LINES,
ctx->json_date_format,
ctx->date_key,
config->json_escape_unicode);
}
chunk = s3_format_event_chunk(ctx, event_chunk, config);
if (chunk == NULL) {
flb_plg_error(ctx->ins, "Could not marshal msgpack to output string");
FLB_OUTPUT_RETURN(FLB_ERROR);
Expand Down Expand Up @@ -3999,6 +4073,11 @@ static int cb_s3_exit(void *data, struct flb_config *config)

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "format", "json_lines",
0, FLB_FALSE, 0,
"Set record output format. Supported values are json_lines, and otlp_json."
},
{
FLB_CONFIG_MAP_STR, "json_date_format", NULL,
0, FLB_FALSE, 0,
Expand Down
1 change: 1 addition & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ struct flb_s3 {
struct flb_tls *client_tls;

struct flb_aws_client *s3_client;
int out_format;
int json_date_format;
flb_sds_t json_date_key;
flb_sds_t date_key;
Expand Down
73 changes: 69 additions & 4 deletions src/flb_json.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,25 @@ static yyjson_mut_val *msgpack_to_yyjson_mut(yyjson_mut_doc *document,
}
}

static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document,
struct flb_json_mut_val *value)
/*
* This limit prevents stack overflows in case mutable documents contain
* unexpectedly deep nesting or structural cycles.
*/
#define FLB_JSON_MUT_RENDER_MAX_DEPTH 1024

static yyjson_mut_val *mutable_to_yyjson_mut_internal(yyjson_mut_doc *document,
struct flb_json_mut_val *value,
size_t depth)
{
struct flb_json_mut_kv *kv_entry;
struct flb_json_mut_entry *array_entry;
yyjson_mut_val *result;
yyjson_mut_val *item;

if (depth > FLB_JSON_MUT_RENDER_MAX_DEPTH) {
return NULL;
}

switch (value->type) {
case FLB_JSON_MUT_OBJECT:
result = yyjson_mut_obj(document);
Expand All @@ -523,7 +534,9 @@ static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document,
for (kv_entry = value->data.object.head;
kv_entry != NULL;
kv_entry = kv_entry->next) {
item = mutable_to_yyjson_mut(document, kv_entry->value);
item = mutable_to_yyjson_mut_internal(document,
kv_entry->value,
depth + 1);
if (item == NULL) {
return NULL;
}
Expand All @@ -545,7 +558,9 @@ static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document,
for (array_entry = value->data.array.head;
array_entry != NULL;
array_entry = array_entry->next) {
item = mutable_to_yyjson_mut(document, array_entry->value);
item = mutable_to_yyjson_mut_internal(document,
array_entry->value,
depth + 1);
if (item == NULL || !yyjson_mut_arr_add_val(result, item)) {
return NULL;
}
Expand All @@ -569,6 +584,12 @@ static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document,
}
}

static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document,
struct flb_json_mut_val *value)
{
return mutable_to_yyjson_mut_internal(document, value, 0);
}

static char *render_msgpack_document_yyjson(struct flb_json_doc *document,
size_t *length,
int pretty)
Expand Down Expand Up @@ -709,6 +730,42 @@ static struct flb_json_mut_entry *mut_array_entry_create(struct flb_json_mut_doc
return entry;
}

static int mut_val_contains(struct flb_json_mut_val *value,
struct flb_json_mut_val *target)
{
struct flb_json_mut_kv *kv_entry;
struct flb_json_mut_entry *array_entry;

if (value == NULL || target == NULL) {
return FLB_FALSE;
}

if (value == target) {
return FLB_TRUE;
}

if (value->type == FLB_JSON_MUT_OBJECT) {
for (kv_entry = value->data.object.head;
kv_entry != NULL;
kv_entry = kv_entry->next) {
if (mut_val_contains(kv_entry->value, target)) {
return FLB_TRUE;
}
}
}
else if (value->type == FLB_JSON_MUT_ARRAY) {
for (array_entry = value->data.array.head;
array_entry != NULL;
array_entry = array_entry->next) {
if (mut_val_contains(array_entry->value, target)) {
return FLB_TRUE;
}
}
}

return FLB_FALSE;
}

static struct flb_json_mut_val *copy_msgpack_to_mutable(struct flb_json_mut_doc *document,
msgpack_object *object);

Expand Down Expand Up @@ -1544,6 +1601,10 @@ int flb_json_mut_arr_add_val(struct flb_json_mut_val *array,
return 0;
}

if (mut_val_contains(value, array)) {
return 0;
}

entry = mut_array_entry_create(array->owner, value);
if (entry == NULL) {
return 0;
Expand Down Expand Up @@ -1724,6 +1785,10 @@ static int flb_json_mut_obj_add_val_len(struct flb_json_mut_doc *document,
return 0;
}

if (mut_val_contains(value, object)) {
return 0;
}

entry = mut_kv_create(document, key, key_length, value);
if (entry == NULL) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ def test_out_s3_otlp_json_uploads_signal_payloads(signal_type, json_file, root_k
service = Service("out_s3_otlp_json.yaml")
_start_or_skip_unsupported_s3_format(service, "format")
_send_otlp_signal(service, signal_type, json_file)
request = service.wait_for_request()
try:
request = service.wait_for_request()
except TimeoutError:
if signal_type == "metrics" or signal_type == "traces":
pytest.skip("otlp_json metrics or traces payload uploads are not emitted by this Fluent Bit binary")
raise
service.stop()

assert request["method"] == "PUT"
Expand Down
3 changes: 3 additions & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ if(FLB_IN_LIB)
FLB_RT_TEST(FLB_OUT_FILE "out_file.c")
endif()
FLB_RT_TEST(FLB_OUT_S3 "out_s3.c")
if (FLB_IN_OPENTELEMETRY AND FLB_OUT_S3)
FLB_RT_TEST(FLB_OUT_S3 "out_s3_otlp_json.c")
endif()
FLB_RT_TEST(FLB_OUT_TD "out_td.c")
FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c")
FLB_RT_TEST(FLB_OUT_CHRONICLE "out_chronicle.c")
Expand Down
10 changes: 6 additions & 4 deletions tests/runtime/out_s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void flb_test_s3_multipart_success(void)
int out_ffd;
char *call_count_str;
int call_count;
char store_dir[] = "/tmp/flb-s3-test-XXXXXX";
char store_dir[] = "/tmp/flb-s3-test-multipart-XXXXXX";

TEST_CHECK(mkdtemp(store_dir) != NULL);

Expand Down Expand Up @@ -120,6 +120,7 @@ void flb_test_s3_putobject_error(void)
int out_ffd;
char *call_count_str;
int call_count;
char store_dir[] = "/tmp/flb-s3-test-putobj-XXXXXX";

/* mocks calls- signals that we are in test mode */
setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1);
Expand All @@ -139,6 +140,7 @@ void flb_test_s3_putobject_error(void)
flb_output_set(ctx, out_ffd,"use_put_object", "true", NULL);
flb_output_set(ctx, out_ffd,"total_file_size", "5M", NULL);
flb_output_set(ctx, out_ffd,"upload_timeout", "6s", NULL);
flb_output_set(ctx, out_ffd,"store_dir", store_dir, NULL);
flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL);

ret = flb_start(ctx);
Expand Down Expand Up @@ -228,7 +230,7 @@ void flb_test_s3_upload_part_error(void)
int out_ffd;
char *call_count_str;
int call_count;
char store_dir[] = "/tmp/flb-s3-test-XXXXXX";
char store_dir[] = "/tmp/flb-s3-test-part-err-XXXXXX";

TEST_CHECK(mkdtemp(store_dir) != NULL);

Expand Down Expand Up @@ -286,7 +288,7 @@ void flb_test_s3_complete_upload_error(void)
int out_ffd;
char *call_count_str;
int call_count;
char store_dir[] = "/tmp/flb-s3-test-XXXXXX";
char store_dir[] = "/tmp/flb-s3-test-uplaod-err-XXXXXX";

TEST_CHECK(mkdtemp(store_dir) != NULL);

Expand Down Expand Up @@ -625,7 +627,7 @@ void flb_test_s3_preserve_data_ordering(void)
int out_ffd;
char *call_count_str;
int call_count;
char store_dir[] = "/tmp/flb-s3-test-XXXXXX";
char store_dir[] = "/tmp/flb-s3-test-ordering-XXXXXX";

TEST_CHECK(mkdtemp(store_dir) != NULL);

Expand Down
Loading
Loading