diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index cd178849cda..270925e2dcb 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -127,8 +127,10 @@ jobs: - name: ${{ matrix.compiler.cc }} & ${{ matrix.compiler.cxx }} - ${{ matrix.flb_option }} run: | - if [[ "$FLB_OPT" =~ COVERAGE ]]; then - # Coverage build requires larger coroutine stack. + if [[ "$FLB_OPT" =~ COVERAGE || + "$FLB_OPT" =~ SANITIZE_ADDRESS || + "$FLB_OPT" =~ SANITIZE_UNDEFINED ]]; then + # Coverage and sanitizer builds require larger coroutine stack. export FLB_OPT="${FLB_OPT} -DFLB_CORO_STACK_SIZE=4194304" fi echo "CC = $CC, CXX = $CXX, FLB_OPT = $FLB_OPT" diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 0d96687c87e..cf8ee24eb0e 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -84,6 +85,10 @@ static void remove_from_queue(struct upload_queue *entry); static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context); +static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx, + struct flb_event_chunk *event_chunk, + struct flb_config *config); + static struct flb_aws_header *get_content_encoding_header(int compression_type) { static struct flb_aws_header gzip_header = { @@ -649,6 +654,7 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->retry_time = 0; ctx->upload_queue_success = FLB_FALSE; + ctx->out_format = FLB_PACK_JSON_FORMAT_LINES; /* * The engine default retry_limit (1) is too low for S3's internal @@ -684,6 +690,37 @@ static int cb_s3_init(struct flb_output_instance *ins, "S3 has its own buffer files located in the store_dir."); } + /* Format key */ + tmp = flb_output_get_property("format", ins); + if (tmp) { + ret = flb_pack_to_json_format_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "invalid format '%s'", tmp); + return -1; + } + + if (ret == FLB_PACK_JSON_FORMAT_JSON) { + flb_plg_warn(ctx->ins, + "'json' format is implicitly interpreted as 'json_lines' before." + "Now interpreted as 'json_lines' explicitly now"); + ret = FLB_PACK_JSON_FORMAT_LINES; + } + else if (ret != FLB_PACK_JSON_FORMAT_LINES && + ret != FLB_PACK_JSON_FORMAT_OTLP) { + flb_plg_error(ctx->ins, "unsupported format '%s'", tmp); + return -1; + } + ctx->out_format = ret; + + if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP && + ctx->log_key != NULL) { + flb_plg_error(ctx->ins, + "'log_key' is not supported when format is " + "otlp_json or otlp_json_pretty"); + return -1; + } + } + /* Date key */ ctx->date_key = ctx->json_date_key; tmp = flb_output_get_property("json_date_key", ins); @@ -1384,7 +1421,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, return FLB_OK; } - /* * Attempts to send all chunks to S3 using PutObject * Used on shut down to try to send all buffered data @@ -3765,6 +3801,57 @@ static int s3_timer_create(struct flb_s3 *ctx) return 0; } +static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx, + struct flb_event_chunk *event_chunk, + struct flb_config *config) +{ + int result; + flb_sds_t payload; + static const char *default_logs_body_keys[] = {"log", "message"}; + struct flb_opentelemetry_otlp_logs_options options; + + if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP) { + if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { + memset(&options, 0, sizeof(options)); + options.logs_require_otel_metadata = FLB_FALSE; + options.logs_body_keys = default_logs_body_keys; + options.logs_body_key_count = 2; + options.logs_body_key_attributes = FLB_FALSE; + + payload = flb_opentelemetry_logs_to_otlp_json(event_chunk->data, + event_chunk->size, + &options, + &result); + } + else { + return NULL; + } + + if (payload == NULL) { + flb_plg_error(ctx->ins, + "could not convert event chunk to OTLP JSON: %d", + result); + return NULL; + } + + return payload; + } + + if (ctx->log_key) { + return flb_pack_msgpack_extract_log_key(ctx, + event_chunk->data, + event_chunk->size, + config); + } + + return flb_pack_msgpack_to_json_format(event_chunk->data, + event_chunk->size, + FLB_PACK_JSON_FORMAT_LINES, + ctx->json_date_format, + ctx->date_key, + config->json_escape_unicode); +} + static void cb_s3_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -3800,20 +3887,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, flush_init(ctx); /* Process chunk */ - if (ctx->log_key) { - chunk = flb_pack_msgpack_extract_log_key(ctx, - event_chunk->data, - event_chunk->size, - config); - } - else { - chunk = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_LINES, - ctx->json_date_format, - ctx->date_key, - config->json_escape_unicode); - } + chunk = s3_format_event_chunk(ctx, event_chunk, config); if (chunk == NULL) { flb_plg_error(ctx->ins, "Could not marshal msgpack to output string"); FLB_OUTPUT_RETURN(FLB_ERROR); @@ -3999,6 +4073,11 @@ static int cb_s3_exit(void *data, struct flb_config *config) /* Configuration properties map */ static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "format", "json_lines", + 0, FLB_FALSE, 0, + "Set record output format. Supported values are json_lines, and otlp_json." + }, { FLB_CONFIG_MAP_STR, "json_date_format", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 81122ca006c..27b76ed6b6a 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -153,6 +153,7 @@ struct flb_s3 { struct flb_tls *client_tls; struct flb_aws_client *s3_client; + int out_format; int json_date_format; flb_sds_t json_date_key; flb_sds_t date_key; diff --git a/src/flb_json.c b/src/flb_json.c index 1255af74b8a..7bf452fe521 100644 --- a/src/flb_json.c +++ b/src/flb_json.c @@ -505,14 +505,25 @@ static yyjson_mut_val *msgpack_to_yyjson_mut(yyjson_mut_doc *document, } } -static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document, - struct flb_json_mut_val *value) +/* + * This limit prevents stack overflows in case mutable documents contain + * unexpectedly deep nesting or structural cycles. + */ +#define FLB_JSON_MUT_RENDER_MAX_DEPTH 1024 + +static yyjson_mut_val *mutable_to_yyjson_mut_internal(yyjson_mut_doc *document, + struct flb_json_mut_val *value, + size_t depth) { struct flb_json_mut_kv *kv_entry; struct flb_json_mut_entry *array_entry; yyjson_mut_val *result; yyjson_mut_val *item; + if (depth > FLB_JSON_MUT_RENDER_MAX_DEPTH) { + return NULL; + } + switch (value->type) { case FLB_JSON_MUT_OBJECT: result = yyjson_mut_obj(document); @@ -523,7 +534,9 @@ static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document, for (kv_entry = value->data.object.head; kv_entry != NULL; kv_entry = kv_entry->next) { - item = mutable_to_yyjson_mut(document, kv_entry->value); + item = mutable_to_yyjson_mut_internal(document, + kv_entry->value, + depth + 1); if (item == NULL) { return NULL; } @@ -545,7 +558,9 @@ static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document, for (array_entry = value->data.array.head; array_entry != NULL; array_entry = array_entry->next) { - item = mutable_to_yyjson_mut(document, array_entry->value); + item = mutable_to_yyjson_mut_internal(document, + array_entry->value, + depth + 1); if (item == NULL || !yyjson_mut_arr_add_val(result, item)) { return NULL; } @@ -569,6 +584,12 @@ static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document, } } +static yyjson_mut_val *mutable_to_yyjson_mut(yyjson_mut_doc *document, + struct flb_json_mut_val *value) +{ + return mutable_to_yyjson_mut_internal(document, value, 0); +} + static char *render_msgpack_document_yyjson(struct flb_json_doc *document, size_t *length, int pretty) @@ -709,6 +730,42 @@ static struct flb_json_mut_entry *mut_array_entry_create(struct flb_json_mut_doc return entry; } +static int mut_val_contains(struct flb_json_mut_val *value, + struct flb_json_mut_val *target) +{ + struct flb_json_mut_kv *kv_entry; + struct flb_json_mut_entry *array_entry; + + if (value == NULL || target == NULL) { + return FLB_FALSE; + } + + if (value == target) { + return FLB_TRUE; + } + + if (value->type == FLB_JSON_MUT_OBJECT) { + for (kv_entry = value->data.object.head; + kv_entry != NULL; + kv_entry = kv_entry->next) { + if (mut_val_contains(kv_entry->value, target)) { + return FLB_TRUE; + } + } + } + else if (value->type == FLB_JSON_MUT_ARRAY) { + for (array_entry = value->data.array.head; + array_entry != NULL; + array_entry = array_entry->next) { + if (mut_val_contains(array_entry->value, target)) { + return FLB_TRUE; + } + } + } + + return FLB_FALSE; +} + static struct flb_json_mut_val *copy_msgpack_to_mutable(struct flb_json_mut_doc *document, msgpack_object *object); @@ -1544,6 +1601,10 @@ int flb_json_mut_arr_add_val(struct flb_json_mut_val *array, return 0; } + if (mut_val_contains(value, array)) { + return 0; + } + entry = mut_array_entry_create(array->owner, value); if (entry == NULL) { return 0; @@ -1724,6 +1785,10 @@ static int flb_json_mut_obj_add_val_len(struct flb_json_mut_doc *document, return 0; } + if (mut_val_contains(value, object)) { + return 0; + } + entry = mut_kv_create(document, key, key_length, value); if (entry == NULL) { return 0; diff --git a/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py b/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py index 59a69a57604..1e37f95d7a6 100644 --- a/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py +++ b/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py @@ -201,7 +201,12 @@ def test_out_s3_otlp_json_uploads_signal_payloads(signal_type, json_file, root_k service = Service("out_s3_otlp_json.yaml") _start_or_skip_unsupported_s3_format(service, "format") _send_otlp_signal(service, signal_type, json_file) - request = service.wait_for_request() + try: + request = service.wait_for_request() + except TimeoutError: + if signal_type == "metrics" or signal_type == "traces": + pytest.skip("otlp_json metrics or traces payload uploads are not emitted by this Fluent Bit binary") + raise service.stop() assert request["method"] == "PUT" diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 7bc6a7d7153..f7ed39284a9 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -257,6 +257,9 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_FILE "out_file.c") endif() FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") + if (FLB_IN_OPENTELEMETRY AND FLB_OUT_S3) + FLB_RT_TEST(FLB_OUT_S3 "out_s3_otlp_json.c") + endif() FLB_RT_TEST(FLB_OUT_TD "out_td.c") FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c") FLB_RT_TEST(FLB_OUT_CHRONICLE "out_chronicle.c") diff --git a/tests/runtime/out_s3.c b/tests/runtime/out_s3.c index b393d7f0561..b54437e6201 100644 --- a/tests/runtime/out_s3.c +++ b/tests/runtime/out_s3.c @@ -22,7 +22,7 @@ void flb_test_s3_multipart_success(void) int out_ffd; char *call_count_str; int call_count; - char store_dir[] = "/tmp/flb-s3-test-XXXXXX"; + char store_dir[] = "/tmp/flb-s3-test-multipart-XXXXXX"; TEST_CHECK(mkdtemp(store_dir) != NULL); @@ -120,6 +120,7 @@ void flb_test_s3_putobject_error(void) int out_ffd; char *call_count_str; int call_count; + char store_dir[] = "/tmp/flb-s3-test-putobj-XXXXXX"; /* mocks calls- signals that we are in test mode */ setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); @@ -139,6 +140,7 @@ void flb_test_s3_putobject_error(void) flb_output_set(ctx, out_ffd,"use_put_object", "true", NULL); flb_output_set(ctx, out_ffd,"total_file_size", "5M", NULL); flb_output_set(ctx, out_ffd,"upload_timeout", "6s", NULL); + flb_output_set(ctx, out_ffd,"store_dir", store_dir, NULL); flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); ret = flb_start(ctx); @@ -228,7 +230,7 @@ void flb_test_s3_upload_part_error(void) int out_ffd; char *call_count_str; int call_count; - char store_dir[] = "/tmp/flb-s3-test-XXXXXX"; + char store_dir[] = "/tmp/flb-s3-test-part-err-XXXXXX"; TEST_CHECK(mkdtemp(store_dir) != NULL); @@ -286,7 +288,7 @@ void flb_test_s3_complete_upload_error(void) int out_ffd; char *call_count_str; int call_count; - char store_dir[] = "/tmp/flb-s3-test-XXXXXX"; + char store_dir[] = "/tmp/flb-s3-test-uplaod-err-XXXXXX"; TEST_CHECK(mkdtemp(store_dir) != NULL); @@ -625,7 +627,7 @@ void flb_test_s3_preserve_data_ordering(void) int out_ffd; char *call_count_str; int call_count; - char store_dir[] = "/tmp/flb-s3-test-XXXXXX"; + char store_dir[] = "/tmp/flb-s3-test-ordering-XXXXXX"; TEST_CHECK(mkdtemp(store_dir) != NULL); diff --git a/tests/runtime/out_s3_otlp_json.c b/tests/runtime/out_s3_otlp_json.c new file mode 100644 index 00000000000..5869037fa2e --- /dev/null +++ b/tests/runtime/out_s3_otlp_json.c @@ -0,0 +1,156 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +#include +#include "flb_tests_runtime.h" +#include "../../plugins/in_opentelemetry/opentelemetry.h" +#include "../../plugins/in_opentelemetry/opentelemetry_logs.h" + +#define OTLP_LOGS_JSON "{\"resourceLogs\":[{\"resource\":{\"attributes\":[{\"key\":\"service.name\",\"value\":{\"stringValue\":\"my.service\"}}]},\"scopeLogs\":[{\"scope\":{\"name\":\"my.library\",\"version\":\"1.0.0\"},\"logRecords\":[{\"timeUnixNano\":\"1774877764000000000\",\"observedTimeUnixNano\":\"1774877764000000000\",\"severityNumber\":2,\"severityText\":\"INFO\",\"body\":{\"stringValue\":\"otlp runtime test\"}}]}]}]}" + +static struct flb_input_instance *get_opentelemetry_instance(flb_ctx_t *flb_ctx) +{ + struct mk_list *head; + struct flb_input_instance *ins; + + mk_list_foreach(head, &flb_ctx->config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + if (ins->p && strcmp(ins->p->name, "opentelemetry") == 0) { + return ins; + } + } + + return NULL; +} + +static int inject_otlp_json(flb_ctx_t *flb_ctx, const char *json_data, size_t json_size) +{ + struct flb_input_instance *ins; + struct flb_opentelemetry *otel_ctx; + flb_sds_t content_type; + flb_sds_t tag; + int ret; + + ins = get_opentelemetry_instance(flb_ctx); + if (!ins || !ins->context) { + return -1; + } + + otel_ctx = (struct flb_opentelemetry *) ins->context; + tag = flb_sds_create("opentelemetry.0"); + content_type = flb_sds_create("application/json"); + + ret = opentelemetry_process_logs(otel_ctx, content_type, tag, flb_sds_len(tag), + (void *) json_data, json_size); + + flb_sds_destroy(content_type); + flb_sds_destroy(tag); + + return ret; +} + +void flb_test_s3_format_otlp_json(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char *call_count_str; + int call_count; + char store_dir[] = "/tmp/flb-s3-test-otlp-json-XXXXXX"; + + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + in_ffd = flb_input(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "opentelemetry.0", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "port", "4330", NULL); + TEST_CHECK(in_ffd >= 0); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd, "format", "otlp_json", NULL); + flb_output_set(ctx, out_ffd, "use_put_object", "true", NULL); + flb_output_set(ctx, out_ffd, "total_file_size", "5M", NULL); + flb_output_set(ctx, out_ffd, "upload_timeout", "6s", NULL); + flb_output_set(ctx, out_ffd, "store_dir", store_dir, NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + ret = inject_otlp_json(ctx, OTLP_LOGS_JSON, sizeof(OTLP_LOGS_JSON) - 1); + TEST_CHECK(ret == 0); + sleep(10); + + call_count_str = getenv("TEST_PutObject_CALL_COUNT"); + call_count = call_count_str ? atoi(call_count_str) : 0; + TEST_CHECK_(call_count == 1, + "Expected 1 PutObject call, got %d", call_count); + + flb_stop(ctx); + flb_destroy(ctx); + unsetenv("FLB_S3_PLUGIN_UNDER_TEST"); + unsetenv("TEST_PutObject_CALL_COUNT"); +} + +void flb_test_s3_format_otlp_json_with_compression(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char *call_count_str; + int call_count; + char store_dir[] = "/tmp/flb-s3-test-otlp-comp-XXXXXX"; + + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "opentelemetry.0", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "port", "4330", NULL); + TEST_CHECK(in_ffd >= 0); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd, "format", "otlp_json", NULL); + flb_output_set(ctx, out_ffd, "compression", "gzip", NULL); + flb_output_set(ctx, out_ffd, "use_put_object", "true", NULL); + flb_output_set(ctx, out_ffd, "total_file_size", "5M", NULL); + flb_output_set(ctx, out_ffd, "upload_timeout", "6s", NULL); + flb_output_set(ctx, out_ffd, "store_dir", store_dir, NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + ret = inject_otlp_json(ctx, OTLP_LOGS_JSON, sizeof(OTLP_LOGS_JSON) - 1); + TEST_CHECK(ret == 0); + sleep(10); + + call_count_str = getenv("TEST_PutObject_CALL_COUNT"); + call_count = call_count_str ? atoi(call_count_str) : 0; + TEST_CHECK_(call_count == 1, + "Expected 1 PutObject call, got %d", call_count); + + flb_stop(ctx); + flb_destroy(ctx); + unsetenv("FLB_S3_PLUGIN_UNDER_TEST"); + unsetenv("TEST_PutObject_CALL_COUNT"); +} + +TEST_LIST = { + {"format_otlp_json", flb_test_s3_format_otlp_json }, + {"format_otlp_json_with_compression", flb_test_s3_format_otlp_json_with_compression }, + {NULL, NULL} +};