diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 552bcfdc1c0..9f37c0a0bd5 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -124,11 +124,21 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in, size_t records, const char *tag, size_t tag_len, const void *buf, size_t buf_size); +int flb_input_chunk_append_raw_local(struct flb_input_instance *in, + int event_type, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size); int flb_input_chunk_ring_buffer_enqueue(struct flb_input_instance *in, int event_type, size_t records, const char *tag, size_t tag_len, const void *buf, size_t buf_size); +int flb_input_chunk_ring_buffer_enqueue_log_routing(struct flb_input_instance *in, + int event_type, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size); const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size); int flb_input_chunk_release_lock(struct flb_input_chunk *ic); diff --git a/include/fluent-bit/flb_input_log.h b/include/fluent-bit/flb_input_log.h index c6b873c80fe..fa88551bc7b 100644 --- a/include/fluent-bit/flb_input_log.h +++ b/include/fluent-bit/flb_input_log.h @@ -38,4 +38,11 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, size_t tag_len, const void *buf, size_t buf_size); + +int flb_input_log_append_processed(struct flb_input_instance *ins, + size_t records, + const char *tag, + size_t tag_len, + const void *buf, + size_t buf_size); #endif diff --git a/plugins/in_opentelemetry/opentelemetry_logs.c b/plugins/in_opentelemetry/opentelemetry_logs.c index d8327422491..9324dd1f55a 100644 --- a/plugins/in_opentelemetry/opentelemetry_logs.c +++ b/plugins/in_opentelemetry/opentelemetry_logs.c @@ -320,12 +320,13 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, uint8_t *in_buf, size_t in_size) { - int ret; + int ret = 0; int len; int resource_logs_index; int scope_log_index; int log_record_index; char *logs_body_key; + int scope_has_schema_url; struct flb_mp_map_header mh; struct flb_mp_map_header mh_tmp; struct flb_time tm; @@ -356,6 +357,11 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, } resource_logs = input_logs->resource_logs; + if (input_logs->n_resource_logs == 0) { + ret = 0; + goto binary_payload_to_msgpack_end; + } + if (resource_logs == NULL) { flb_plg_warn(ctx->ins, "no resource logs found"); ret = -1; @@ -364,6 +370,11 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) { resource_log = resource_logs[resource_logs_index]; + if (resource_log == NULL) { + flb_plg_warn(ctx->ins, "null resource logs entry found"); + ret = -1; + goto binary_payload_to_msgpack_end; + } resource = resource_log->resource; scope_logs = resource_log->scope_logs; @@ -376,8 +387,18 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) { scope_log = scope_logs[scope_log_index]; + if (scope_log == NULL) { + flb_plg_warn(ctx->ins, "null scope logs entry found"); + ret = -1; + goto binary_payload_to_msgpack_end; + } + log_records = scope_log->log_records; + if (scope_log->n_log_records == 0) { + continue; + } + if (log_records == NULL) { flb_plg_warn(ctx->ins, "no log records found"); ret = -1; @@ -452,11 +473,18 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, /* Scope */ scope = scope_log->scope; + scope_has_schema_url = FLB_FALSE; - if (scope && (scope->name || scope->version || scope->n_attributes > 0)) { + if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) { + scope_has_schema_url = FLB_TRUE; + } + + if (scope && (scope->name || scope->version || + scope->n_attributes > 0 || scope->dropped_attributes_count > 0 || + scope_has_schema_url == FLB_TRUE)) { flb_mp_map_header_init(&mh_tmp, mp_pck); - if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) { + if (scope_has_schema_url == FLB_TRUE) { flb_mp_map_header_append(&mh_tmp); msgpack_pack_str(mp_pck, 10); msgpack_pack_str_body(mp_pck, "schema_url", 10); @@ -508,8 +536,19 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, flb_mp_map_header_end(&mh_tmp); } else { - /* set an empty scope */ - msgpack_pack_map(mp_pck, 0); + flb_mp_map_header_init(&mh_tmp, mp_pck); + + if (scope_has_schema_url == FLB_TRUE) { + flb_mp_map_header_append(&mh_tmp); + msgpack_pack_str(mp_pck, 10); + msgpack_pack_str_body(mp_pck, "schema_url", 10); + + len = strlen(scope_log->schema_url); + msgpack_pack_str(mp_pck, len); + msgpack_pack_str_body(mp_pck, scope_log->schema_url, len); + } + + flb_mp_map_header_end(&mh_tmp); } flb_mp_map_header_end(&mh); diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index 8535110557e..c09cbf9eb2a 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -906,7 +906,11 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) if (ctx->oauth2_ctx) { flb_oauth2_destroy(ctx->oauth2_ctx); } - flb_oauth2_config_destroy(&ctx->oauth2_config); + /* + * ctx->oauth2_config strings are populated through the output config map, + * so flb_config_map_destroy() owns their release. The OAuth2 runtime + * context clones the strings it needs. + */ flb_free(ctx->proxy_host); flb_free(ctx); diff --git a/plugins/out_opentelemetry/opentelemetry_logs.c b/plugins/out_opentelemetry/opentelemetry_logs.c index 4c174ea49d4..f8af8577af8 100644 --- a/plugins/out_opentelemetry/opentelemetry_logs.c +++ b/plugins/out_opentelemetry/opentelemetry_logs.c @@ -24,9 +24,13 @@ #include #include #include +#include #include #include +#include + +#include #include #include "opentelemetry.h" @@ -35,6 +39,7 @@ #define RESOURCE_LOGS_INITIAL_CAPACITY 256 #define SCOPE_LOGS_INITIAL_CAPACITY 100 +#define OTLP_GROUP_HASH_ERROR UINT64_MAX static int hex_to_int(char ch) { @@ -157,6 +162,134 @@ static int get_otlp_group_metadata(struct opentelemetry_context *ctx, struct flb return 0; } +static msgpack_object *msgpack_map_get_object(msgpack_object_map *map, + const char *key) +{ + int index; + + index = flb_otel_utils_find_map_entry_by_key(map, (char *) key, 0, FLB_TRUE); + if (index < 0) { + return NULL; + } + + return &map->ptr[index].val; +} + +static uint64_t msgpack_object_hash(msgpack_object *object) +{ + uint64_t hash; + msgpack_sbuffer buffer; + msgpack_packer packer; + + if (object == NULL) { + return cfl_hash_64bits("null", 4); + } + + msgpack_sbuffer_init(&buffer); + msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + + if (msgpack_pack_object(&packer, *object) != 0) { + msgpack_sbuffer_destroy(&buffer); + return OTLP_GROUP_HASH_ERROR; + } + + hash = cfl_hash_64bits(buffer.data, buffer.size); + msgpack_sbuffer_destroy(&buffer); + + return hash; +} + +static uint64_t msgpack_object_pair_hash(msgpack_object *left, + msgpack_object *right) +{ + uint64_t hash; + msgpack_sbuffer buffer; + msgpack_packer packer; + + msgpack_sbuffer_init(&buffer); + msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + + if (msgpack_pack_array(&packer, 2) != 0) { + msgpack_sbuffer_destroy(&buffer); + return OTLP_GROUP_HASH_ERROR; + } + + if (left == NULL) { + if (msgpack_pack_nil(&packer) != 0) { + msgpack_sbuffer_destroy(&buffer); + return OTLP_GROUP_HASH_ERROR; + } + } + else if (msgpack_pack_object(&packer, *left) != 0) { + msgpack_sbuffer_destroy(&buffer); + return OTLP_GROUP_HASH_ERROR; + } + + if (right == NULL) { + if (msgpack_pack_nil(&packer) != 0) { + msgpack_sbuffer_destroy(&buffer); + return OTLP_GROUP_HASH_ERROR; + } + } + else if (msgpack_pack_object(&packer, *right) != 0) { + msgpack_sbuffer_destroy(&buffer); + return OTLP_GROUP_HASH_ERROR; + } + + hash = cfl_hash_64bits(buffer.data, buffer.size); + msgpack_sbuffer_destroy(&buffer); + + return hash; +} + +static msgpack_object *resource_schema_url_object(msgpack_object *resource_object, + msgpack_object *resource_body) +{ + msgpack_object *schema_url; + + if (resource_body != NULL && resource_body->type == MSGPACK_OBJECT_MAP) { + schema_url = msgpack_map_get_object(&resource_body->via.map, "schema_url"); + if (schema_url != NULL) { + return schema_url; + } + } + + if (resource_object != NULL && resource_object->type == MSGPACK_OBJECT_MAP) { + schema_url = msgpack_map_get_object(&resource_object->via.map, "schema_url"); + if (schema_url != NULL) { + return schema_url; + } + } + + return NULL; +} + +static uint64_t resource_identity_hash(msgpack_object *resource_object, + msgpack_object *resource_body) +{ + msgpack_object *schema_url; + + schema_url = resource_schema_url_object(resource_object, resource_body); + + return msgpack_object_pair_hash(resource_object, schema_url); +} + +static void get_otlp_group_identity_hashes(msgpack_object *group_body, + uint64_t *resource_hash, + uint64_t *scope_hash) +{ + msgpack_object *resource_object = NULL; + msgpack_object *scope_object = NULL; + + if (group_body != NULL && group_body->type == MSGPACK_OBJECT_MAP) { + resource_object = msgpack_map_get_object(&group_body->via.map, "resource"); + scope_object = msgpack_map_get_object(&group_body->via.map, "scope"); + } + + *resource_hash = resource_identity_hash(resource_object, group_body); + *scope_hash = msgpack_object_hash(scope_object); +} + static inline int log_record_set_body(struct opentelemetry_context *ctx, Opentelemetry__Proto__Logs__V1__LogRecord *log_records, struct flb_log_event *event, struct flb_record_accessor **out_ra_match) @@ -314,6 +447,11 @@ static int pack_trace_id(struct opentelemetry_context *ctx, int ret; if (ra_val->o.type == MSGPACK_OBJECT_BIN) { + if (ra_val->o.via.bin.size != 16) { + flb_plg_warn(ctx->ins, "invalid trace_id size"); + return -1; + } + log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); if (!log_record->trace_id.data) { return -1; @@ -322,7 +460,8 @@ static int pack_trace_id(struct opentelemetry_context *ctx, log_record->trace_id.len = ra_val->o.via.bin.size; } else if (ra_val->o.type == MSGPACK_OBJECT_STR) { - if (ra_val->o.via.str.size > 32) { + if (ra_val->o.via.str.size != 32) { + flb_plg_warn(ctx->ins, "invalid trace_id size"); return -1; } @@ -355,7 +494,14 @@ static int pack_span_id(struct opentelemetry_context *ctx, Opentelemetry__Proto__Logs__V1__LogRecord *log_record, struct flb_ra_value *ra_val) { + int ret; + if (ra_val->o.type == MSGPACK_OBJECT_BIN) { + if (ra_val->o.via.bin.size != 8) { + flb_plg_warn(ctx->ins, "invalid span_id size"); + return -1; + } + log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); if (!log_record->span_id.data) { return -1; @@ -364,7 +510,8 @@ static int pack_span_id(struct opentelemetry_context *ctx, log_record->span_id.len = ra_val->o.via.bin.size; } else if (ra_val->o.type == MSGPACK_OBJECT_STR) { - if (ra_val->o.via.str.size > 16) { + if (ra_val->o.via.str.size != 16) { + flb_plg_warn(ctx->ins, "invalid span_id size"); return -1; } @@ -374,12 +521,21 @@ static int pack_span_id(struct opentelemetry_context *ctx, return -1; } - hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size, - log_record->span_id.data, 8); + ret = hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size, + log_record->span_id.data, 8); + if (ret != 0) { + flb_plg_warn(ctx->ins, "invalid span_id format"); + flb_free(log_record->span_id.data); + log_record->span_id.data = NULL; + log_record->span_id.len = 0; + return -1; + } + log_record->span_id.len = 8; } else { flb_plg_warn(ctx->ins, "invalid span_id type"); + return -1; } return 0; @@ -631,7 +787,7 @@ static int append_v1_logs_metadata_and_fields(struct opentelemetry_context *ctx, } /* TraceFlags */ - ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata); + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_trace_flags, *event->metadata); if (ra_val != NULL) { if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { log_record->flags = (uint32_t) ra_val->o.via.u64; @@ -640,7 +796,7 @@ static int append_v1_logs_metadata_and_fields(struct opentelemetry_context *ctx, flb_ra_key_value_destroy(ra_val); } - if (!trace_flags_set) { + if (!trace_flags_set && ctx->ra_trace_flags_metadata) { ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata); if (ra_val != NULL) { if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { @@ -702,15 +858,21 @@ static void free_resource_logs(Opentelemetry__Proto__Logs__V1__ResourceLogs **re for (i = 0 ; i < resource_count ; i++) { resource_log = resource_logs[i]; + if (resource_log == NULL) { + continue; + } if (resource_log->schema_url != NULL && resource_log->schema_url != protobuf_c_empty_string) { flb_sds_destroy(resource_log->schema_url); } - if (resource_log->resource->attributes != NULL) { - otlp_kvarray_destroy(resource_log->resource->attributes, resource_log->resource->n_attributes); + if (resource_log->resource != NULL) { + if (resource_log->resource->attributes != NULL) { + otlp_kvarray_destroy(resource_log->resource->attributes, + resource_log->resource->n_attributes); + } + flb_free(resource_log->resource); } - flb_free(resource_log->resource); /* iterate scoipe logs */ if (resource_log->n_scope_logs > 0) { @@ -737,6 +899,11 @@ static void free_resource_logs(Opentelemetry__Proto__Logs__V1__ResourceLogs **re free_log_records(scope_log->log_records, scope_log->n_log_records); } + if (scope_log->schema_url != NULL && + scope_log->schema_url != protobuf_c_empty_string) { + flb_sds_destroy(scope_log->schema_url); + } + flb_free(scope_log->log_records); flb_free(scope_log); } @@ -951,6 +1118,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, int max_scopes_limit; int max_resources; int native_otel = FLB_FALSE; + int inside_native_otel_group = FLB_FALSE; + int standalone_context_active = FLB_FALSE; + int force_new_resource = FLB_FALSE; size_t resource_logs_capacity; size_t i; size_t new_capacity; @@ -961,6 +1131,10 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, int64_t scope_id = -1; int64_t tmp_resource_id = -1; int64_t tmp_scope_id = -1; + uint64_t resource_hash = 0; + uint64_t scope_hash = 0; + uint64_t tmp_resource_hash = 0; + uint64_t tmp_scope_hash = 0; struct flb_log_event_decoder *decoder; struct flb_log_event event; struct opentelemetry_context *ctx; @@ -1040,15 +1214,50 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, ret = get_otlp_group_metadata(ctx, &event, &tmp_resource_id, &tmp_scope_id); if (ret == -1) { /* skip unknown group info */ + inside_native_otel_group = FLB_FALSE; + standalone_context_active = FLB_FALSE; continue; } + get_otlp_group_identity_hashes(event.body, + &tmp_resource_hash, + &tmp_scope_hash); + if (tmp_resource_hash == OTLP_GROUP_HASH_ERROR || + tmp_scope_hash == OTLP_GROUP_HASH_ERROR) { + flb_plg_error(ctx->ins, "could not compute OTLP group identity hash"); + ret = FLB_RETRY; + break; + } + /* flag this as a native otel schema */ - native_otel = FLB_TRUE; + if (standalone_context_active == FLB_TRUE) { + resource_id = -1; + scope_id = -1; + resource_hash = 0; + scope_hash = 0; + resource_log = NULL; + scope_log = NULL; + standalone_context_active = FLB_FALSE; + } + native_otel = FLB_TRUE; + inside_native_otel_group = FLB_TRUE; + + force_new_resource = FLB_FALSE; + if (resource_id == tmp_resource_id && + resource_hash == tmp_resource_hash && + (scope_id != tmp_scope_id || scope_hash != tmp_scope_hash) && + max_scopes_limit > 0 && + resource_log != NULL && + resource_log->n_scope_logs >= max_scopes_limit) { + force_new_resource = FLB_TRUE; + } /* if we have a new resource_id, start a new resource context */ - if (resource_id != tmp_resource_id) { +start_resource: + if (force_new_resource == FLB_TRUE || + resource_id != tmp_resource_id || + resource_hash != tmp_resource_hash) { if (max_resources > 0) { if (export_logs.n_resource_logs >= max_resources) { /* respect the configured resource batching limit */ @@ -1097,7 +1306,6 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, resource_logs_capacity = new_capacity; } -start_resource: /* * On every group start, check if we are following the previous resource_id or not, so we can pack scopes * under the right resource. @@ -1119,7 +1327,10 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, resource_log->resource = flb_calloc(1, sizeof(Opentelemetry__Proto__Resource__V1__Resource)); if (!resource_log->resource) { flb_errno(); + resource_logs[export_logs.n_resource_logs - 1] = NULL; + export_logs.n_resource_logs--; flb_free(resource_log); + resource_log = NULL; ret = FLB_RETRY; break; } @@ -1157,10 +1368,13 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, /* update the current resource_id and reset scope_id */ resource_id = tmp_resource_id; + resource_hash = tmp_resource_hash; scope_id = -1; + scope_hash = 0; + force_new_resource = FLB_FALSE; } - if (scope_id != tmp_scope_id) { + if (scope_id != tmp_scope_id || scope_hash != tmp_scope_hash) { resource_index = export_logs.n_resource_logs - 1; /* check limits */ @@ -1221,6 +1435,7 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, } opentelemetry__proto__common__v1__instrumentation_scope__init(scope_log->scope); scope_id = tmp_scope_id; + scope_hash = tmp_scope_hash; log_records = flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); if (!log_records) { @@ -1263,15 +1478,16 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, else if (record_type == FLB_LOG_EVENT_GROUP_END) { /* do nothing */ ret = FLB_OK; - resource_id = -1; - scope_id = -1; native_otel = FLB_FALSE; + inside_native_otel_group = FLB_FALSE; continue; } /* if we have a real OTLP context package using log_records */ - if (resource_id >= 0 && scope_id >= 0) { + if ((inside_native_otel_group == FLB_TRUE || + standalone_context_active == FLB_TRUE) && + resource_id >= 0 && scope_id >= 0) { } else { @@ -1279,8 +1495,28 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, * standalone packaging: the record is not part of an original OTLP structure, so there is no group * information. We create a temporary resource for the incoming records unless a group is defined. */ + if (standalone_context_active == FLB_FALSE) { + resource_id = -1; + scope_id = -1; + resource_hash = 0; + scope_hash = 0; + resource_log = NULL; + scope_log = NULL; + standalone_context_active = FLB_TRUE; + } + tmp_resource_id = 0; tmp_scope_id = 0; + get_otlp_group_identity_hashes(NULL, + &tmp_resource_hash, + &tmp_scope_hash); + if (tmp_resource_hash == OTLP_GROUP_HASH_ERROR || + tmp_scope_hash == OTLP_GROUP_HASH_ERROR) { + flb_plg_error(ctx->ins, "could not compute default OTLP group identity hash"); + ret = FLB_RETRY; + break; + } + force_new_resource = FLB_FALSE; goto start_resource; } diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 3d82fe1c381..0a5f83ea7dc 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -53,9 +54,12 @@ #define FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL 0 #define FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL 1 +#define FLB_INPUT_CHUNK_RAW_LOG_ROUTING (1 << 0) + struct input_chunk_raw { struct flb_input_instance *ins; int event_type; + int flags; size_t records; flb_sds_t tag; void *buf_data; @@ -2977,6 +2981,7 @@ static void destroy_chunk_raw(struct input_chunk_raw *cr) static int append_to_ring_buffer(struct flb_input_instance *ins, int event_type, + int flags, size_t records, const char *tag, size_t tag_len, @@ -3001,6 +3006,7 @@ static int append_to_ring_buffer(struct flb_input_instance *ins, } cr->ins = ins; cr->event_type = event_type; + cr->flags = flags; if (tag && tag_len > 0) { cr->tag = flb_sds_create_len(tag, tag_len); @@ -3123,9 +3129,17 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data) tag_len = 0; } - input_chunk_append_raw(cr->ins, cr->event_type, cr->records, - cr->tag, tag_len, - cr->buf_data, cr->buf_size); + if (cr->event_type == FLB_INPUT_LOGS && + (cr->flags & FLB_INPUT_CHUNK_RAW_LOG_ROUTING) != 0) { + flb_input_log_append_processed(cr->ins, cr->records, + cr->tag, tag_len, + cr->buf_data, cr->buf_size); + } + else { + input_chunk_append_raw(cr->ins, cr->event_type, cr->records, + cr->tag, tag_len, + cr->buf_data, cr->buf_size); + } destroy_chunk_raw(cr); } cr = NULL; @@ -3148,7 +3162,7 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in, * add the data reference to the ring buffer. */ if (flb_input_is_threaded(in)) { - ret = append_to_ring_buffer(in, event_type, records, + ret = append_to_ring_buffer(in, event_type, 0, records, tag, tag_len, buf, buf_size); } @@ -3160,17 +3174,37 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in, return ret; } +int flb_input_chunk_append_raw_local(struct flb_input_instance *in, + int event_type, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size) +{ + return input_chunk_append_raw(in, event_type, records, + tag, tag_len, buf, buf_size); +} + int flb_input_chunk_ring_buffer_enqueue(struct flb_input_instance *in, int event_type, size_t records, const char *tag, size_t tag_len, const void *buf, size_t buf_size) { - return append_to_ring_buffer(in, event_type, records, + return append_to_ring_buffer(in, event_type, 0, records, tag, tag_len, buf, buf_size); } +int flb_input_chunk_ring_buffer_enqueue_log_routing(struct flb_input_instance *in, + int event_type, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size) +{ + return append_to_ring_buffer(in, event_type, FLB_INPUT_CHUNK_RAW_LOG_ROUTING, + records, tag, tag_len, buf, buf_size); +} + /* Retrieve a raw buffer from a dyntag node */ const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size) { diff --git a/src/flb_input_log.c b/src/flb_input_log.c index fcf4b594a06..0919fca3cd4 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -897,28 +897,64 @@ static void route_payload_list_destroy(struct cfl_list *payloads) } } -static void input_chunk_remove_conditional_routes(struct flb_input_instance *ins, - struct flb_input_chunk *chunk) +static int input_chunk_write_direct_route_metadata(struct flb_input_instance *ins, + struct flb_input_chunk *chunk) { - ssize_t chunk_size; - size_t chunk_size_sz; + int ret; + int direct_count; + int direct_index; + int write_ret; + int label_is_alias; + int tag_len; + size_t label_length; + size_t plugin_length; + const char *tag; + const char *label_source; + const char *plugin_name; struct cfl_list *head; struct flb_router_path *route_path; + struct flb_chunk_direct_route *direct_routes; if (!ins || !chunk || !chunk->routes_mask || !ins->config) { - return; + return -1; } - chunk_size = -1; + direct_count = 0; cfl_list_foreach(head, &ins->routes_direct) { route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (!route_path->ins) { + continue; + } - if (!route_path->route || !route_path->ins) { + if (flb_routes_mask_get_bit(chunk->routes_mask, + route_path->ins->id, + ins->config->router) == 0) { continue; } - if (!route_path->route->condition && - !route_path->route->per_record_routing) { + direct_count++; + } + + if (direct_count == 0) { + return 0; + } + + ret = flb_input_chunk_get_tag(chunk, &tag, &tag_len); + if (ret != 0 || !tag || tag_len <= 0) { + return -1; + } + + direct_routes = flb_calloc((size_t) direct_count, + sizeof(struct flb_chunk_direct_route)); + if (!direct_routes) { + flb_errno(); + return -1; + } + + direct_index = 0; + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (!route_path->ins) { continue; } @@ -928,32 +964,150 @@ static void input_chunk_remove_conditional_routes(struct flb_input_instance *ins continue; } - flb_routes_mask_clear_bit(chunk->routes_mask, - route_path->ins->id, - ins->config->router); - - if (route_path->ins->total_limit_size == -1 || - chunk->fs_counted == FLB_FALSE) { + if (direct_index >= direct_count) { continue; } - if (chunk_size == -1) { - chunk_size = flb_input_chunk_get_real_size(chunk); - if (chunk_size <= 0) { - chunk_size = 0; + label_source = route_path->ins->alias; + label_length = 0; + plugin_name = NULL; + plugin_length = 0; + label_is_alias = FLB_FALSE; + if (!label_source || label_source[0] == '\0') { + label_source = route_path->ins->name; + } + else { + label_is_alias = FLB_TRUE; + } + if (label_source) { + label_length = strlen(label_source); + if (label_length > UINT16_MAX) { + label_length = UINT16_MAX; + } + } + if (route_path->ins->p && route_path->ins->p->name) { + plugin_name = route_path->ins->p->name; + plugin_length = strlen(plugin_name); + if (plugin_length > UINT16_MAX) { + plugin_length = UINT16_MAX; } } + direct_routes[direct_index].id = (uint32_t) route_path->ins->id; + direct_routes[direct_index].label = label_source; + direct_routes[direct_index].label_length = (uint16_t) label_length; + direct_routes[direct_index].label_is_alias = (uint8_t) label_is_alias; + direct_routes[direct_index].plugin_name = plugin_name; + direct_routes[direct_index].plugin_name_length = (uint16_t) plugin_length; + direct_index++; + } + + if (direct_index == direct_count) { + write_ret = flb_input_chunk_write_header_v2(chunk->chunk, + chunk->event_type, + (char *) tag, + tag_len, + direct_routes, + direct_count); + if (write_ret != 0) { + flb_plg_warn(ins, + "failed to persist direct routes for chunk %s", + flb_input_chunk_get_name(chunk)); + } + } + + flb_free(direct_routes); + + return 0; +} + +static int input_chunk_apply_base_direct_routes(struct flb_input_instance *ins, + struct flb_input_chunk *chunk) +{ + int has_routes; + ssize_t chunk_size; + size_t chunk_size_sz; + size_t mask_size; + struct cfl_list *head; + struct mk_list *o_head; + struct flb_router_path *route_path; + struct flb_output_instance *o_ins; + + if (!ins || !chunk || !chunk->routes_mask || !ins->config) { + return -1; + } + + chunk_size = -1; + chunk_size_sz = 0; + + if (chunk->fs_counted == FLB_TRUE) { + chunk_size = flb_input_chunk_get_real_size(chunk); if (chunk_size > 0) { chunk_size_sz = (size_t) chunk_size; - if (route_path->ins->fs_chunks_size > chunk_size_sz) { - route_path->ins->fs_chunks_size -= chunk_size_sz; + } + + mk_list_foreach(o_head, &ins->config->outputs) { + o_ins = mk_list_entry(o_head, struct flb_output_instance, _head); + + if (o_ins->total_limit_size == -1) { + continue; + } + + if (flb_routes_mask_get_bit(chunk->routes_mask, + o_ins->id, + ins->config->router) == 0) { + continue; + } + + if (o_ins->fs_chunks_size > chunk_size_sz) { + o_ins->fs_chunks_size -= chunk_size_sz; } else { - route_path->ins->fs_chunks_size = 0; + o_ins->fs_chunks_size = 0; } } + + chunk->fs_counted = FLB_FALSE; } + + mask_size = flb_routes_mask_get_size(ins->config->router); + memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * mask_size); + + has_routes = FLB_FALSE; + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + + if (!route_path->ins) { + continue; + } + + if (route_path->route && + (route_path->route->condition || route_path->route->per_record_routing)) { + continue; + } + + flb_routes_mask_set_bit(chunk->routes_mask, + route_path->ins->id, + ins->config->router); + has_routes = FLB_TRUE; + } + + if (has_routes == FLB_FALSE) { + return 0; + } + + if (chunk_size == -1) { + chunk_size = flb_input_chunk_get_real_size(chunk); + if (chunk_size > 0) { + chunk_size_sz = (size_t) chunk_size; + } + } + + if (chunk_size_sz > 0) { + flb_input_chunk_update_output_instances(chunk, chunk_size_sz); + } + + return input_chunk_write_direct_route_metadata(ins, chunk); } static int input_has_conditional_routes(struct flb_input_instance *ins) @@ -980,7 +1134,8 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, const char *tag, size_t tag_len, const void *buf, - size_t buf_size) + size_t buf_size, + int local_append) { int ret; int appended; @@ -1018,13 +1173,6 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, return 0; } - /* Conditional routing not supported for threaded inputs */ - if (flb_input_is_threaded(ins)) { - flb_plg_warn(ins, "conditional routing not supported for threaded inputs, " - "falling back to normal routing"); - return 0; - } - cfl_list_init(&payloads); cfl_list_foreach(head, &ins->routes_direct) { route_path = cfl_list_entry(head, struct flb_router_path, _head); @@ -1249,13 +1397,24 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, continue; } - ret = flb_input_chunk_append_raw(ins, - FLB_INPUT_LOGS, - payload->total_records, - payload->tag, - flb_sds_len(payload->tag), - payload->data, - payload->size); + if (local_append == FLB_TRUE) { + ret = flb_input_chunk_append_raw_local(ins, + FLB_INPUT_LOGS, + payload->total_records, + payload->tag, + flb_sds_len(payload->tag), + payload->data, + payload->size); + } + else { + ret = flb_input_chunk_append_raw(ins, + FLB_INPUT_LOGS, + payload->total_records, + payload->tag, + flb_sds_len(payload->tag), + payload->data, + payload->size); + } if (ret != 0) { flb_router_chunk_context_destroy(&context); route_payload_list_destroy(&payloads); @@ -1305,56 +1464,20 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, return 0; } -static int input_log_append(struct flb_input_instance *ins, - size_t processor_starting_stage, - size_t records, - const char *tag, size_t tag_len, - const void *buf, size_t buf_size) +static int input_log_append_processed_internal(struct flb_input_instance *ins, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size, + int local_append) { int ret; int conditional_result; int conditional_handled = FLB_FALSE; - int processor_is_active; - void *out_buf = (void *) buf; size_t dummy = 0; - size_t out_size = buf_size; const char *base_tag = tag; size_t base_tag_len = tag_len; struct flb_input_chunk *chunk = NULL; - processor_is_active = flb_processor_is_active(ins->processor); - if (processor_is_active) { - if (!tag) { - if (ins->tag && ins->tag_len > 0) { - tag = ins->tag; - tag_len = ins->tag_len; - } - else { - tag = ins->name; - tag_len = strlen(ins->name); - } - } - - ret = flb_processor_run(ins->processor, - processor_starting_stage, - FLB_PROCESSOR_LOGS, - tag, tag_len, - (char *) buf, buf_size, - &out_buf, &out_size); - if (ret == -1) { - return -1; - } - - if (out_size == 0) { - return 0; - } - - if (buf != out_buf) { - /* a new buffer was created, re-count the number of records */ - records = flb_mp_count_log_records(out_buf, out_size); - } - } - if (!base_tag) { if (ins->tag && ins->tag_len > 0) { base_tag = ins->tag; @@ -1370,11 +1493,9 @@ static int input_log_append(struct flb_input_instance *ins, } conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len, - out_buf, out_size); + buf, buf_size, + local_append); if (conditional_result < 0) { - if (processor_is_active && buf != out_buf) { - flb_free(out_buf); - } return -1; } @@ -1387,8 +1508,14 @@ static int input_log_append(struct flb_input_instance *ins, * receive data even when conditional routes exist. The conditional routing * should be additive, not exclusive. */ - ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, out_buf, out_size); + if (local_append == FLB_TRUE) { + ret = flb_input_chunk_append_raw_local(ins, FLB_INPUT_LOGS, records, + tag, tag_len, buf, buf_size); + } + else { + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, + tag, tag_len, buf, buf_size); + } if (ret == 0 && conditional_handled == FLB_TRUE && base_tag) { chunk = NULL; @@ -1399,13 +1526,81 @@ static int input_log_append(struct flb_input_instance *ins, base_tag_len, (void **) &chunk, &dummy) >= 0 && chunk) { - input_chunk_remove_conditional_routes(ins, chunk); + if (input_chunk_apply_base_direct_routes(ins, chunk) != 0) { + return -1; + } } } + return ret; +} + +static int input_log_append(struct flb_input_instance *ins, + size_t processor_starting_stage, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size) +{ + int ret; + int processor_is_active; + void *out_buf = (void *) buf; + size_t out_size = buf_size; + + processor_is_active = flb_processor_is_active(ins->processor); + if (processor_is_active) { + if (!tag) { + if (ins->tag && ins->tag_len > 0) { + tag = ins->tag; + tag_len = ins->tag_len; + } + else { + tag = ins->name; + tag_len = strlen(ins->name); + } + } + + ret = flb_processor_run(ins->processor, + processor_starting_stage, + FLB_PROCESSOR_LOGS, + tag, tag_len, + (char *) buf, buf_size, + &out_buf, &out_size); + if (ret == -1) { + return -1; + } + + if (out_size == 0) { + if (buf != out_buf) { + flb_free(out_buf); + } + return 0; + } + + if (buf != out_buf) { + /* a new buffer was created, re-count the number of records */ + records = flb_mp_count_log_records(out_buf, out_size); + } + } + + if (flb_input_is_threaded(ins) == FLB_TRUE && + input_has_conditional_routes(ins) == FLB_TRUE) { + ret = flb_input_chunk_ring_buffer_enqueue_log_routing(ins, + FLB_INPUT_LOGS, + records, + tag, tag_len, + out_buf, out_size); + } + else { + ret = input_log_append_processed_internal(ins, records, + tag, tag_len, + out_buf, out_size, + FLB_FALSE); + } + if (processor_is_active && buf != out_buf) { flb_free(out_buf); } + return ret; } @@ -1422,6 +1617,23 @@ int flb_input_log_append(struct flb_input_instance *ins, return ret; } +int flb_input_log_append_processed(struct flb_input_instance *ins, + size_t records, + const char *tag, + size_t tag_len, + const void *buf, + size_t buf_size) +{ + if (records == 0) { + records = flb_mp_count_log_records(buf, buf_size); + } + + return input_log_append_processed_internal(ins, records, + tag, tag_len, + buf, buf_size, + FLB_TRUE); +} + /* Take a msgpack serialized record and enqueue it as a chunk */ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, size_t processor_starting_stage, @@ -1450,4 +1662,3 @@ int flb_input_log_append_records(struct flb_input_instance *ins, ret = input_log_append(ins, 0, records, tag, tag_len, buf, buf_size); return ret; } - diff --git a/src/opentelemetry/flb_opentelemetry_logs.c b/src/opentelemetry/flb_opentelemetry_logs.c index fe5c8ef2de4..d2a876d534a 100644 --- a/src/opentelemetry/flb_opentelemetry_logs.c +++ b/src/opentelemetry/flb_opentelemetry_logs.c @@ -605,12 +605,24 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode } for (index = 0 ; index < scope_logs->size ; index++) { + if (scope_logs->ptr[index].type != MSGPACK_OBJECT_MAP) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_UNEXPECTED_SCOPELOGS_TYPE; + } + return -FLB_OTEL_LOGS_ERR_UNEXPECTED_SCOPELOGS_TYPE; + } /* * we use a temporary encoder to hold the group information, if no record entries are added * we will discard it. **/ tmp_encoder = flb_log_event_encoder_create(encoder->format); + if (tmp_encoder == NULL) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_ENCODER_FAILURE; + } + return -FLB_OTEL_LOGS_ERR_ENCODER_FAILURE; + } flb_log_event_encoder_group_init(tmp_encoder); /* pack internal schema */ @@ -671,6 +683,7 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode flb_log_event_encoder_body_commit_map(tmp_encoder); /* scope schemaUrl */ + scope_schema_url = NULL; result = flb_otel_utils_find_map_entry_by_key(&scope_logs->ptr[index].via.map, "schemaUrl", 0, FLB_TRUE); if (result >= 0) { obj = &scope_logs->ptr[index].via.map.ptr[result].val; @@ -691,7 +704,7 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode } } - if (scope) { + if (scope || scope_schema_url) { /* * if the scope is found, process every expected key one by one to avoid * wrongly ingested items. @@ -703,38 +716,48 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode /* scope map value */ flb_log_event_encoder_body_begin_map(tmp_encoder); - /* scope name */ - result = flb_otel_utils_find_map_entry_by_key(&scope->via.map, "name", 0, FLB_TRUE); - if (result >= 0) { - obj = &scope->via.map.ptr[result].val; - if (obj->type == MSGPACK_OBJECT_STR) { - flb_log_event_encoder_append_body_values(tmp_encoder, - FLB_LOG_EVENT_CSTRING_VALUE("name"), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); + if (scope) { + /* scope name */ + result = flb_otel_utils_find_map_entry_by_key(&scope->via.map, "name", 0, FLB_TRUE); + if (result >= 0) { + obj = &scope->via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_STR) { + flb_log_event_encoder_append_body_values(tmp_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("name"), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); + } } - } - /* scope version */ - result = flb_otel_utils_find_map_entry_by_key(&scope->via.map, "version", 0, FLB_TRUE); - if (result >= 0) { - obj = &scope->via.map.ptr[result].val; - if (obj->type == MSGPACK_OBJECT_STR) { - flb_log_event_encoder_append_body_values(tmp_encoder, - FLB_LOG_EVENT_CSTRING_VALUE("version"), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); + /* scope version */ + result = flb_otel_utils_find_map_entry_by_key(&scope->via.map, "version", 0, FLB_TRUE); + if (result >= 0) { + obj = &scope->via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_STR) { + flb_log_event_encoder_append_body_values(tmp_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("version"), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); + } } - } - /* scope attributes */ - result = flb_otel_utils_find_map_entry_by_key(&scope->via.map, "attributes", 0, FLB_TRUE); - if (result >= 0) { - obj = &scope->via.map.ptr[result].val; - if (obj->type == MSGPACK_OBJECT_ARRAY) { - flb_log_event_encoder_append_body_string(tmp_encoder, "attributes", 10); - result = flb_otel_utils_json_payload_append_converted_kvlist(tmp_encoder, - FLB_LOG_EVENT_BODY, - obj); - if (result != 0) { + /* scope attributes */ + result = flb_otel_utils_find_map_entry_by_key(&scope->via.map, "attributes", 0, FLB_TRUE); + if (result >= 0) { + obj = &scope->via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_ARRAY) { + flb_log_event_encoder_append_body_string(tmp_encoder, "attributes", 10); + result = flb_otel_utils_json_payload_append_converted_kvlist(tmp_encoder, + FLB_LOG_EVENT_BODY, + obj); + if (result != 0) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; + } + flb_log_event_encoder_destroy(tmp_encoder); + return -FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; + } + } + else { + /* scope attributes must be an array per OTLP spec; return error if not */ if (error_status) { *error_status = FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; } @@ -742,14 +765,6 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode return -FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; } } - else { - /* scope attributes must be an array per OTLP spec; return error if not */ - if (error_status) { - *error_status = FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; - } - flb_log_event_encoder_destroy(tmp_encoder); - return -FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; - } } /* scope schemaUrl */ diff --git a/tests/integration/scenarios/in_opentelemetry/tests/test_in_opentelemetry_001.py b/tests/integration/scenarios/in_opentelemetry/tests/test_in_opentelemetry_001.py index 71936ffc201..df365e01aad 100644 --- a/tests/integration/scenarios/in_opentelemetry/tests/test_in_opentelemetry_001.py +++ b/tests/integration/scenarios/in_opentelemetry/tests/test_in_opentelemetry_001.py @@ -326,6 +326,94 @@ def build_resource_collision_logs_json_payload(user_id, body, schema_url=None): return json.dumps(payload).encode("utf-8") +def build_scope_schema_logs_payload(): + payload = { + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "scope-schema-service", + }, + } + ], + }, + "scopeLogs": [ + { + "schemaUrl": "scope-schema-a", + "logRecords": [ + { + "timeUnixNano": "1640995200000000000", + "body": { + "stringValue": "event-a", + }, + } + ], + }, + { + "logRecords": [ + { + "timeUnixNano": "1640995200000000000", + "body": { + "stringValue": "event-b", + }, + } + ], + }, + ], + } + ], + } + + return json_format.Parse(json.dumps(payload), ExportLogsServiceRequest()).SerializeToString() + + +def build_scope_schema_logs_json_payload(): + payload = { + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "scope-schema-service", + }, + } + ], + }, + "scopeLogs": [ + { + "schemaUrl": "scope-schema-a", + "logRecords": [ + { + "timeUnixNano": "1640995200000000000", + "body": { + "stringValue": "event-a", + }, + } + ], + }, + { + "logRecords": [ + { + "timeUnixNano": "1640995200000000000", + "body": { + "stringValue": "event-b", + }, + } + ], + }, + ], + } + ], + } + + return json.dumps(payload).encode("utf-8") + + class Service: def __init__(self, config_file, *, use_auth_server=False): # Compose the absolute path for the Fluent Bit configuration file @@ -792,6 +880,43 @@ def test_in_opentelemetry_stdout_otlp_json_logs_preserve_resource_schema_urls( assert len(output["resourceLogs"]) == 2 +@pytest.mark.parametrize( + "content_type,payload_builder", + [ + ("application/x-protobuf", build_scope_schema_logs_payload), + ("application/json", build_scope_schema_logs_json_payload), + ], +) +def test_in_opentelemetry_stdout_otlp_json_logs_preserve_scope_schema_urls( + content_type, + payload_builder, +): + service = Service("stdout-otlp-json-slow-flush.yaml") + service.start() + try: + response = service.send_raw_request( + "/v1/logs", + payload_builder(), + content_type=content_type, + ) + assert 200 <= response.status_code < 300 + + output = read_stdout_otlp_json(service, "resourceLogs", timeout=10) + finally: + service.stop() + + assert len(output["resourceLogs"]) == 1 + scope_by_body = { + record["body"]["stringValue"]: scope_log + for resource_log in output["resourceLogs"] + for scope_log in resource_log["scopeLogs"] + for record in scope_log["logRecords"] + } + + assert scope_by_body["event-a"]["schemaUrl"] == "scope-schema-a" + assert "schemaUrl" not in scope_by_body["event-b"] + + def test_in_opentelemetry_stdout_otlp_json_metrics(): service = Service("003-stdout-otlp-json.yaml") service.start() diff --git a/tests/integration/scenarios/out_opentelemetry/config/out_otel_http_conditional_grouped_logs_non_threaded.yaml b/tests/integration/scenarios/out_opentelemetry/config/out_otel_http_conditional_grouped_logs_non_threaded.yaml new file mode 100644 index 00000000000..f7e8e28fb9c --- /dev/null +++ b/tests/integration/scenarios/out_opentelemetry/config/out_otel_http_conditional_grouped_logs_non_threaded.yaml @@ -0,0 +1,63 @@ +service: + flush: 1 + grace: 1 + log_level: trace + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: opentelemetry + alias: grouped_otel_input + port: ${FLUENT_BIT_TEST_LISTENER_PORT} + routes: + logs: + - name: grouped_alpha + condition: + op: and + rules: + - context: otel_resource_attributes + field: $route_group + op: eq + value: alpha + to: + outputs: + - otel_group_alpha + + - name: grouped_beta + condition: + op: and + rules: + - context: otel_resource_attributes + field: $route_group + op: eq + value: beta + to: + outputs: + - otel_group_beta + + - name: grouped_default + condition: + default: true + to: + outputs: + - otel_group_default + + outputs: + - name: opentelemetry + alias: otel_group_alpha + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + logs_uri: /conditional/group/alpha + + - name: opentelemetry + alias: otel_group_beta + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + logs_uri: /conditional/group/beta + + - name: opentelemetry + alias: otel_group_default + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + logs_uri: /conditional/group/default diff --git a/tests/integration/scenarios/out_opentelemetry/config/out_otel_http_conditional_grouped_logs_threaded.yaml b/tests/integration/scenarios/out_opentelemetry/config/out_otel_http_conditional_grouped_logs_threaded.yaml new file mode 100644 index 00000000000..5a4dd431423 --- /dev/null +++ b/tests/integration/scenarios/out_opentelemetry/config/out_otel_http_conditional_grouped_logs_threaded.yaml @@ -0,0 +1,64 @@ +service: + flush: 1 + grace: 1 + log_level: trace + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: opentelemetry + alias: threaded_grouped_otel_input + threaded: true + port: ${FLUENT_BIT_TEST_LISTENER_PORT} + routes: + logs: + - name: threaded_grouped_alpha + condition: + op: and + rules: + - context: otel_resource_attributes + field: $route_group + op: eq + value: alpha + to: + outputs: + - otel_group_alpha + + - name: threaded_grouped_beta + condition: + op: and + rules: + - context: otel_resource_attributes + field: $route_group + op: eq + value: beta + to: + outputs: + - otel_group_beta + + - name: threaded_grouped_default + condition: + default: true + to: + outputs: + - otel_group_default + + outputs: + - name: opentelemetry + alias: otel_group_alpha + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + logs_uri: /conditional/group/alpha + + - name: opentelemetry + alias: otel_group_beta + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + logs_uri: /conditional/group/beta + + - name: opentelemetry + alias: otel_group_default + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + logs_uri: /conditional/group/default diff --git a/tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py b/tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py index 832c85e59bf..ffd666533bf 100644 --- a/tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py +++ b/tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py @@ -32,18 +32,21 @@ def _repo_relative(*parts): return os.path.abspath(os.path.join(os.path.dirname(__file__), *parts)) +def _attributes_to_dict(attributes): + return { + item["key"]: next(iter(item["value"].values())) + for item in attributes + } + + def iter_log_records(output): for resource_log in output.get("resourceLogs", []): - resource_attributes = { - item["key"]: next(iter(item["value"].values())) - for item in resource_log.get("resource", {}).get("attributes", []) - } + resource_attributes = _attributes_to_dict( + resource_log.get("resource", {}).get("attributes", []) + ) for scope_log in resource_log.get("scopeLogs", []): for record in scope_log.get("logRecords", []): - attributes = { - item["key"]: next(iter(item["value"].values())) - for item in record.get("attributes", []) - } + attributes = _attributes_to_dict(record.get("attributes", [])) yield record, attributes, resource_attributes @@ -287,6 +290,89 @@ def _build_resource_collision_payload(user_id, body): } +def _build_conditional_grouped_logs_payload(): + def resource(route_group, group_id, scopes): + return { + "schema_url": f"https://schemas.example/{route_group}", + "resource": { + "attributes": [ + { + "key": "route_group", + "value": { + "string_value": route_group, + }, + }, + { + "key": "group_id", + "value": { + "string_value": group_id, + }, + }, + { + "key": "service_name", + "value": { + "string_value": f"service-{route_group}", + }, + }, + ], + }, + "scope_logs": scopes, + } + + def scope(scope_name, scope_version, body, flags): + return { + "schema_url": f"https://schemas.example/{scope_name}", + "scope": { + "name": scope_name, + "version": scope_version, + "attributes": [ + { + "key": "scope_marker", + "value": { + "string_value": f"{scope_name}-marker", + }, + } + ], + }, + "log_records": [ + { + "time_unix_nano": "1640995200000000000", + "body": { + "string_value": body, + }, + "flags": flags, + "attributes": [ + { + "key": "record_marker", + "value": { + "string_value": f"{body}-marker", + }, + } + ], + } + ], + } + + resource_logs = [ + resource( + "alpha", + "group-alpha", + [ + scope("scope-alpha-a", "1.0.0", "event-alpha-a", 1), + scope("scope-alpha-b", "1.1.0", "event-alpha-b", 2), + ], + ), + resource("beta", "group-beta", [scope("scope-beta", "2.0.0", "event-beta", 3)]), + resource( + "fallback", + "group-default", + [scope("scope-default", "3.0.0", "event-default", 4)], + ), + ] + + return {"resource_logs": resource_logs} + + def _assert_log_resource_attribution(logs_seen): output = json.loads(json_format.MessageToJson(logs_seen[0])) records = list(iter_log_records(output)) @@ -300,6 +386,70 @@ def _assert_log_resource_attribution(logs_seen): assert len(output["resourceLogs"]) == 2 +def _log_payloads_by_request_path(logs_seen, requests_seen): + assert len(logs_seen) >= len(requests_seen) + + decoded_by_path = {} + for log_seen in logs_seen: + output = json.loads(json_format.MessageToJson(log_seen)) + resource_logs = output.get("resourceLogs", []) + assert len(resource_logs) == 1 + + resource_attributes = _attributes_to_dict( + resource_logs[0].get("resource", {}).get("attributes", []) + ) + group_id = resource_attributes.get("group_id") + assert group_id is not None + assert group_id.startswith("group-") + + path = f"/conditional/group/{group_id[6:]}" + assert path not in decoded_by_path + decoded_by_path[path] = output + + payloads_by_path = {} + for request_seen in requests_seen: + path = request_seen["path"] + assert path in decoded_by_path + payloads_by_path[path] = decoded_by_path[path] + + return payloads_by_path + + +def _assert_grouped_resource(output, *, route_group, group_id, scopes): + resource_logs = output.get("resourceLogs", []) + assert len(resource_logs) == 1 + + resource_log = resource_logs[0] + assert resource_log["schemaUrl"] == f"https://schemas.example/{route_group}" + + resource_attributes = _attributes_to_dict( + resource_log.get("resource", {}).get("attributes", []) + ) + assert resource_attributes["route_group"] == route_group + assert resource_attributes["group_id"] == group_id + assert resource_attributes["service_name"] == f"service-{route_group}" + + scope_logs = resource_log.get("scopeLogs", []) + assert len(scope_logs) == len(scopes) + + for scope_log, expected in zip(scope_logs, scopes): + scope = scope_log["scope"] + assert scope_log["schemaUrl"] == f"https://schemas.example/{expected['name']}" + assert scope["name"] == expected["name"] + assert scope["version"] == expected["version"] + assert _attributes_to_dict(scope.get("attributes", []))["scope_marker"] == ( + f"{expected['name']}-marker" + ) + + records = scope_log.get("logRecords", []) + assert len(records) == 1 + assert records[0]["body"]["stringValue"] == expected["body"] + assert records[0]["flags"] == expected["flags"] + assert _attributes_to_dict(records[0].get("attributes", []))["record_marker"] == ( + f"{expected['body']}-marker" + ) + + def test_out_opentelemetry_http_logs_uri_headers_and_basic_auth(): service = Service("out_otel_http_logs.yaml") service.start() @@ -612,6 +762,78 @@ def test_out_opentelemetry_custom_metadata_key_accessors(): assert attributes["custom_attr"] == "custom_value" +@pytest.mark.parametrize( + "config_file", + [ + "out_otel_http_conditional_grouped_logs_non_threaded.yaml", + "out_otel_http_conditional_grouped_logs_threaded.yaml", + ], + ids=["non_threaded", "threaded"], +) +def test_out_opentelemetry_conditional_routing_preserves_group_metadata(config_file): + service = Service(config_file) + service.start() + try: + service.send_payload_dict(_build_conditional_grouped_logs_payload(), "logs") + logs_seen = list(service.wait_for_signal("logs", minimum_count=3, timeout=15)) + requests_seen = list(service.wait_for_requests(3, timeout=15)) + finally: + service.stop() + + payloads_by_path = _log_payloads_by_request_path(logs_seen, requests_seen) + assert set(payloads_by_path) == { + "/conditional/group/alpha", + "/conditional/group/beta", + "/conditional/group/default", + } + + _assert_grouped_resource( + payloads_by_path["/conditional/group/alpha"], + route_group="alpha", + group_id="group-alpha", + scopes=[ + { + "name": "scope-alpha-a", + "version": "1.0.0", + "body": "event-alpha-a", + "flags": 1, + }, + { + "name": "scope-alpha-b", + "version": "1.1.0", + "body": "event-alpha-b", + "flags": 2, + }, + ], + ) + _assert_grouped_resource( + payloads_by_path["/conditional/group/beta"], + route_group="beta", + group_id="group-beta", + scopes=[ + { + "name": "scope-beta", + "version": "2.0.0", + "body": "event-beta", + "flags": 3, + }, + ], + ) + _assert_grouped_resource( + payloads_by_path["/conditional/group/default"], + route_group="fallback", + group_id="group-default", + scopes=[ + { + "name": "scope-default", + "version": "3.0.0", + "body": "event-default", + "flags": 4, + }, + ], + ) + + def _wait_for_log_message(service, message, timeout=15): def _contains_message(): if not os.path.exists(service.flb.log_file): diff --git a/tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_non_threaded.yaml b/tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_non_threaded.yaml new file mode 100644 index 00000000000..f18a421568b --- /dev/null +++ b/tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_non_threaded.yaml @@ -0,0 +1,100 @@ +service: + flush: 1 + grace: 1 + log_level: trace + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + alias: non_threaded_processor_dummy + tag: non_threaded.processor.base + dummy: '{"message":"non threaded processor match"}' + samples: 1 + processors: + logs: + - name: content_modifier + action: insert + key: topic + value: topic1 + routes: + logs: + - name: non_threaded_processor_all + to: + outputs: + - stdout_all + + - name: non_threaded_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout_topic1 + + - name: dummy + alias: non_threaded_default_dummy + tag: non_threaded.default.base + dummy: '{"message":"non threaded default fallback","topic":"topic2"}' + samples: 1 + routes: + logs: + - name: non_threaded_default_all + to: + outputs: + - stdout_all + + - name: non_threaded_default_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout_topic1 + + - name: non_threaded_default + condition: + default: true + to: + outputs: + - stdout_default + + - name: dummy + alias: non_threaded_miss_dummy + tag: non_threaded.miss.base + dummy: '{"message":"non threaded unmatched without default","topic":"topic2"}' + samples: 1 + routes: + logs: + - name: non_threaded_miss_all + to: + outputs: + - stdout_all + + - name: non_threaded_miss_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout_topic1 + + outputs: + - name: stdout + alias: stdout_all + + - name: stdout + alias: stdout_topic1 + + - name: stdout + alias: stdout_default diff --git a/tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_threaded.yaml b/tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_threaded.yaml new file mode 100644 index 00000000000..26e1b337c37 --- /dev/null +++ b/tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_threaded.yaml @@ -0,0 +1,103 @@ +service: + flush: 1 + grace: 1 + log_level: trace + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + alias: threaded_processor_dummy + threaded: true + tag: threaded.processor.base + dummy: '{"message":"threaded processor match"}' + samples: 1 + processors: + logs: + - name: content_modifier + action: insert + key: topic + value: topic1 + routes: + logs: + - name: threaded_processor_all + to: + outputs: + - stdout_all + + - name: threaded_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout_topic1 + + - name: dummy + alias: threaded_default_dummy + threaded: true + tag: threaded.default.base + dummy: '{"message":"threaded default fallback","topic":"topic2"}' + samples: 1 + routes: + logs: + - name: threaded_default_all + to: + outputs: + - stdout_all + + - name: threaded_default_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout_topic1 + + - name: threaded_default + condition: + default: true + to: + outputs: + - stdout_default + + - name: dummy + alias: threaded_miss_dummy + threaded: true + tag: threaded.miss.base + dummy: '{"message":"threaded unmatched without default","topic":"topic2"}' + samples: 1 + routes: + logs: + - name: threaded_miss_all + to: + outputs: + - stdout_all + + - name: threaded_miss_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout_topic1 + + outputs: + - name: stdout + alias: stdout_all + + - name: stdout + alias: stdout_topic1 + + - name: stdout + alias: stdout_default diff --git a/tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_default_route.yaml b/tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_default_route.yaml new file mode 100644 index 00000000000..5dd686ecf48 --- /dev/null +++ b/tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_default_route.yaml @@ -0,0 +1,47 @@ +service: + flush: 1 + grace: 1 + log_level: trace + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + alias: filler_dummy + tag: filler + dummy: '{"message":"filler"}' + samples: 1 + + - name: dummy + alias: threaded_default_dummy + threaded: true + tag: threaded.default.base + dummy: '{"message":"threaded default route","topic":"topic2"}' + samples: 1 + routes: + logs: + - name: threaded_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout1 + + - name: threaded_default + condition: + default: true + to: + outputs: + - stdout2 + + outputs: + - name: stdout + alias: stdout1 + + - name: stdout + alias: stdout2 diff --git a/tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_routing.yaml b/tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_routing.yaml new file mode 100644 index 00000000000..e3e29856d71 --- /dev/null +++ b/tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_routing.yaml @@ -0,0 +1,43 @@ +service: + flush: 1 + grace: 1 + log_level: trace + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + alias: filler_dummy + tag: filler + dummy: '{"message":"filler"}' + samples: 1 + + - name: dummy + alias: threaded_routed_dummy + threaded: true + tag: threaded.base + dummy: '{"message":"threaded conditional match"}' + samples: 1 + processors: + logs: + - name: content_modifier + action: insert + key: topic + value: topic1 + routes: + logs: + - name: threaded_topic1 + condition: + op: and + rules: + - field: $topic + op: eq + value: topic1 + to: + outputs: + - stdout1 + + outputs: + - name: stdout + alias: stdout1 diff --git a/tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py b/tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py index 1750173c0bc..9639573ea00 100644 --- a/tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py +++ b/tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py @@ -2,6 +2,7 @@ import os from pathlib import Path +import pytest import requests from google.protobuf import json_format from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ExportLogsServiceRequest @@ -126,6 +127,20 @@ def _find_json_line(log_text, needle): raise AssertionError(f"Could not find JSON line containing {needle!r}") +def _stdout_lines_for_tag(log_text, tag): + return [ + line + for line in log_text.splitlines() + if f"] {tag}:" in line + ] + + +def _assert_stdout_tag_contains(log_text, tag, *needles): + assert f"] {tag}:" in log_text, f"Could not find stdout tag {tag!r}" + for needle in needles: + assert needle in log_text + + def test_out_stdout_default_format_emits_tagged_log_line(): service = Service("out_stdout_basic.yaml") service.start() @@ -212,16 +227,15 @@ def test_out_stdout_routes_without_alias_bind_to_third_input(): service = Service("out_stdout_routing_no_alias.yaml") service.start() service.wait_for_log_contains("[0] topic1:", timeout=10) - service.wait_for_log_contains("no matching route for input chunk", timeout=10) - log_text = service.wait_for_log_contains("tag 'firstdummy'", timeout=10) + log_text = service.read_log() service.stop() assert "connected input 'dummy.2' route 'topic1' to output 'stdout1'" in log_text assert "[0] topic1:" in log_text assert "\"topic\"=>\"topic1\"" in log_text assert "\"message\"=>\"custom dummy\"" in log_text - assert "no matching route for input chunk" in log_text - assert "tag 'firstdummy'" in log_text + assert "\"message\"=>\"custom dummy one\"" not in log_text + assert "\"message\"=>\"custom dummy two\"" not in log_text def test_out_stdout_routes_with_alias_bind_to_third_input(): @@ -234,3 +248,92 @@ def test_out_stdout_routes_with_alias_bind_to_third_input(): assert "[0] topic1:" in log_text assert "\"topic\"=>\"topic1\"" in log_text assert "\"message\"=>\"custom dummy\"" in log_text + + +def test_out_stdout_threaded_input_conditional_route_matches(): + service = Service("out_stdout_threaded_conditional_routing.yaml") + service.start() + log_text = service.wait_for_log_contains("[0] threaded_topic1:", timeout=10) + service.stop() + + assert "connected input 'threaded_routed_dummy' route 'threaded_topic1'" in log_text + assert "[0] threaded_topic1:" in log_text + assert "\"topic\"=>\"topic1\"" in log_text + assert "\"message\"=>\"threaded conditional match\"" in log_text + assert "conditional routing not supported for threaded inputs" not in log_text + + +def test_out_stdout_threaded_input_conditional_default_route(): + service = Service("out_stdout_threaded_conditional_default_route.yaml") + service.start() + log_text = service.wait_for_log_contains("[0] threaded_default:", timeout=10) + service.stop() + + assert "connected input 'threaded_default_dummy' route 'threaded_topic1'" in log_text + assert "connected input 'threaded_default_dummy' route 'threaded_default'" in log_text + assert "[0] threaded_default:" in log_text + assert "\"message\"=>\"threaded default route\"" in log_text + assert "[0] threaded_topic1:" not in log_text + assert "conditional routing not supported for threaded inputs" not in log_text + + +@pytest.mark.parametrize( + ("mode", "label", "config_file"), + [ + ( + "non_threaded", + "non threaded", + "out_stdout_conditional_corner_non_threaded.yaml", + ), + ( + "threaded", + "threaded", + "out_stdout_conditional_corner_threaded.yaml", + ), + ], +) +def test_out_stdout_conditional_routing_corner_cases(mode, label, config_file): + service = Service(config_file) + service.start() + service.wait_for_log_contains(f"[0] {mode}_topic1:", timeout=10) + service.wait_for_log_contains(f"[0] {mode}_default:", timeout=10) + service.wait_for_log_contains(f"{label} unmatched without default", timeout=10) + log_text = service.read_log() + service.stop() + + assert "conditional routing not supported for threaded inputs" not in log_text + + _assert_stdout_tag_contains( + log_text, + f"{mode}_topic1", + f"\"message\"=>\"{label} processor match\"", + "\"topic\"=>\"topic1\"", + ) + _assert_stdout_tag_contains( + log_text, + f"{mode}.processor.base", + f"\"message\"=>\"{label} processor match\"", + "\"topic\"=>\"topic1\"", + ) + + _assert_stdout_tag_contains( + log_text, + f"{mode}_default", + f"\"message\"=>\"{label} default fallback\"", + "\"topic\"=>\"topic2\"", + ) + _assert_stdout_tag_contains( + log_text, + f"{mode}.default.base", + f"\"message\"=>\"{label} default fallback\"", + "\"topic\"=>\"topic2\"", + ) + + _assert_stdout_tag_contains( + log_text, + f"{mode}.miss.base", + f"\"message\"=>\"{label} unmatched without default\"", + "\"topic\"=>\"topic2\"", + ) + assert _stdout_lines_for_tag(log_text, f"{mode}_default_topic1") == [] + assert _stdout_lines_for_tag(log_text, f"{mode}_miss_topic1") == [] diff --git a/tests/integration/src/utils/test_service.py b/tests/integration/src/utils/test_service.py index 17a6d4591d6..aab3965b5bb 100644 --- a/tests/integration/src/utils/test_service.py +++ b/tests/integration/src/utils/test_service.py @@ -26,6 +26,7 @@ def __init__( self.post_stop = post_stop self.flb = None self._previous_env = {} + self._allocated_ports = set() def _reset_storage(self): if not self.data_storage: @@ -38,10 +39,17 @@ def _set_env(self, key, value): os.environ[key] = value def allocate_port_env(self, key, *, starting_port=0): - port = find_available_port(starting_port) + port = self._allocate_port(starting_port) self._set_env(key, str(port)) return port + def _allocate_port(self, starting_port=0): + port = find_available_port(starting_port) + while port in self._allocated_ports: + port = find_available_port(port + 1) + self._allocated_ports.add(port) + return port + def _restore_env(self): for key, value in self._previous_env.items(): if value is None: @@ -53,8 +61,8 @@ def _restore_env(self): def start(self): self._reset_storage() self.flb = FluentBitManager(self.config_path) - self.flb_listener_port = find_available_port() - self.test_suite_http_port = find_available_port() + self.flb_listener_port = self._allocate_port() + self.test_suite_http_port = self._allocate_port() self._set_env("FLUENT_BIT_TEST_LISTENER_PORT", str(self.flb_listener_port)) self._set_env("TEST_SUITE_HTTP_PORT", str(self.test_suite_http_port)) @@ -74,6 +82,7 @@ def stop(self): if self.post_stop: self.post_stop(self) self._restore_env() + self._allocated_ports.clear() def wait_for_http_endpoint(self, url, *, timeout=10, interval=0.5): deadline = time.time() + timeout