diff --git a/include/fluent-bit/flb_sds.h b/include/fluent-bit/flb_sds.h index 793d46dba38..5665aff6612 100644 --- a/include/fluent-bit/flb_sds.h +++ b/include/fluent-bit/flb_sds.h @@ -108,5 +108,6 @@ flb_sds_t flb_sds_increase(flb_sds_t s, size_t len); flb_sds_t flb_sds_copy(flb_sds_t s, const char *str, int len); void flb_sds_destroy(flb_sds_t s); flb_sds_t flb_sds_printf(flb_sds_t *sds, const char *fmt, ...); +int flb_sds_snprintf(flb_sds_t *str, size_t size, const char *fmt, ...); #endif diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 26606b94e37..50af298e5f8 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -265,7 +265,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_object root; msgpack_object map; msgpack_object *obj; - char j_index[ES_BULK_HEADER]; + flb_sds_t j_index; struct es_bulk *bulk; struct tm tm; struct flb_time tms; @@ -275,6 +275,12 @@ static int elasticsearch_format(struct flb_config *config, int es_index_custom_len; struct flb_elasticsearch *ctx = plugin_context; + j_index = flb_sds_create_size(ES_BULK_HEADER); + if (j_index == NULL) { + flb_errno(); + return -1; + } + /* Iterate the original buffer and perform adjustments */ msgpack_unpacked_init(&result); @@ -282,6 +288,7 @@ static int elasticsearch_format(struct flb_config *config, ret = msgpack_unpack_next(&result, data, bytes, &off); if (ret != MSGPACK_UNPACK_SUCCESS) { msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } @@ -292,17 +299,22 @@ static int elasticsearch_format(struct flb_config *config, * doing, we just duplicate the content in a new buffer and cleanup. */ msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } root = result.data; if (root.via.array.size == 0) { + msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } /* Create the bulk composer */ bulk = es_bulk_create(bytes); if (!bulk) { + msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } @@ -331,16 +343,16 @@ static int elasticsearch_format(struct flb_config *config, ctx->index, &tm); es_index = index_formatted; if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_WITHOUT_TYPE, - es_index); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + es_index); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT, - es_index, ctx->type); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT, + es_index, ctx->type); } } @@ -443,16 +455,16 @@ static int elasticsearch_format(struct flb_config *config, es_index = logstash_index; if (ctx->generate_id == FLB_FALSE) { if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_WITHOUT_TYPE, - es_index); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + es_index); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT, - es_index, ctx->type); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT, + es_index, ctx->type); } } } @@ -483,6 +495,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&tmp_sbuf); es_bulk_destroy(bulk); + flb_sds_destroy(j_index); return -1; } @@ -493,32 +506,32 @@ static int elasticsearch_format(struct flb_config *config, hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]); if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - es_index, es_uuid); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + es_index, es_uuid); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, es_uuid); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID, + es_index, ctx->type, es_uuid); } } if (ctx->ra_id_key) { id_key_str = es_get_id_value(ctx ,&map); if (id_key_str) { if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - es_index, id_key_str); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + es_index, id_key_str); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, id_key_str); + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID, + es_index, ctx->type, id_key_str); } flb_sds_destroy(id_key_str); id_key_str = NULL; @@ -531,6 +544,7 @@ static int elasticsearch_format(struct flb_config *config, if (!out_buf) { msgpack_unpacked_destroy(&result); es_bulk_destroy(bulk); + flb_sds_destroy(j_index); return -1; } @@ -544,6 +558,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_unpacked_destroy(&result); *out_size = 0; es_bulk_destroy(bulk); + flb_sds_destroy(j_index); return -1; } } @@ -563,7 +578,7 @@ static int elasticsearch_format(struct flb_config *config, fwrite(*out_data, 1, *out_size, stdout); fflush(stdout); } - + flb_sds_destroy(j_index); return 0; } diff --git a/src/flb_sds.c b/src/flb_sds.c index 5ca1e72b815..fb99a63a6ef 100644 --- a/src/flb_sds.c +++ b/src/flb_sds.c @@ -407,3 +407,31 @@ void flb_sds_destroy(flb_sds_t s) head = FLB_SDS_HEADER(s); flb_free(head); } + +/* + * flb_sds_snprintf is a wrapper of snprintf. + * The difference is that this function can increase the buffer of flb_sds_t. + */ +int flb_sds_snprintf(flb_sds_t *str, size_t size, const char *fmt, ...) +{ + va_list va; + flb_sds_t tmp; + int ret; + + retry_snprintf: + va_start(va, fmt); + ret = vsnprintf(*str, size, fmt, va); + if (ret > size) { + tmp = flb_sds_increase(*str, ret-size); + if (tmp == NULL) { + return -1; + } + *str = tmp; + size = ret; + va_end(va); + goto retry_snprintf; + } + va_end(va); + + return ret; +} diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 3b0e0ad1ca6..9ba0c15bcd6 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -420,8 +420,80 @@ void flb_test_div0() flb_destroy(ctx); } + +static void cb_check_long_index(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + char long_index[256] = {0}; + int i; + + for (i=0; i