Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_sds.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,6 @@ flb_sds_t flb_sds_increase(flb_sds_t s, size_t len);
flb_sds_t flb_sds_copy(flb_sds_t s, const char *str, int len);
void flb_sds_destroy(flb_sds_t s);
flb_sds_t flb_sds_printf(flb_sds_t *sds, const char *fmt, ...);
int flb_sds_snprintf(flb_sds_t *str, size_t size, const char *fmt, ...);

#endif
83 changes: 49 additions & 34 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_object root;
msgpack_object map;
msgpack_object *obj;
char j_index[ES_BULK_HEADER];
flb_sds_t j_index;
struct es_bulk *bulk;
struct tm tm;
struct flb_time tms;
Expand All @@ -275,13 +275,20 @@ static int elasticsearch_format(struct flb_config *config,
int es_index_custom_len;
struct flb_elasticsearch *ctx = plugin_context;

j_index = flb_sds_create_size(ES_BULK_HEADER);
if (j_index == NULL) {
flb_errno();
return -1;
}

/* Iterate the original buffer and perform adjustments */
msgpack_unpacked_init(&result);

/* Perform some format validation */
ret = msgpack_unpack_next(&result, data, bytes, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

Expand All @@ -292,17 +299,22 @@ static int elasticsearch_format(struct flb_config *config,
* doing, we just duplicate the content in a new buffer and cleanup.
*/
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

root = result.data;
if (root.via.array.size == 0) {
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

/* Create the bulk composer */
bulk = es_bulk_create(bytes);
if (!bulk) {
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

Expand Down Expand Up @@ -331,16 +343,16 @@ static int elasticsearch_format(struct flb_config *config,
ctx->index, &tm);
es_index = index_formatted;
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
es_index, ctx->type);
}
}

Expand Down Expand Up @@ -443,16 +455,16 @@ static int elasticsearch_format(struct flb_config *config,
es_index = logstash_index;
if (ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
es_index, ctx->type);
}
}
}
Expand Down Expand Up @@ -483,6 +495,7 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&tmp_sbuf);
es_bulk_destroy(bulk);
flb_sds_destroy(j_index);
return -1;
}

Expand All @@ -493,32 +506,32 @@ static int elasticsearch_format(struct flb_config *config,
hash[0], hash[1], hash[2], hash[3],
hash[4], hash[5], hash[6], hash[7]);
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, es_uuid);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, es_uuid);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, es_uuid);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, es_uuid);
}
}
if (ctx->ra_id_key) {
id_key_str = es_get_id_value(ctx ,&map);
if (id_key_str) {
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, id_key_str);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, id_key_str);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, id_key_str);
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, id_key_str);
}
flb_sds_destroy(id_key_str);
id_key_str = NULL;
Expand All @@ -531,6 +544,7 @@ static int elasticsearch_format(struct flb_config *config,
if (!out_buf) {
msgpack_unpacked_destroy(&result);
es_bulk_destroy(bulk);
flb_sds_destroy(j_index);
return -1;
}

Expand All @@ -544,6 +558,7 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_unpacked_destroy(&result);
*out_size = 0;
es_bulk_destroy(bulk);
flb_sds_destroy(j_index);
return -1;
}
}
Expand All @@ -563,7 +578,7 @@ static int elasticsearch_format(struct flb_config *config,
fwrite(*out_data, 1, *out_size, stdout);
fflush(stdout);
}

flb_sds_destroy(j_index);
return 0;
}

Expand Down
28 changes: 28 additions & 0 deletions src/flb_sds.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,31 @@ void flb_sds_destroy(flb_sds_t s)
head = FLB_SDS_HEADER(s);
flb_free(head);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function is not necessary and we can simply rely on flb_sds_printf() functionality.

flb_sds_t by default always reallocate if necessary, and calculating the final length can be done with flb_sds_len()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionality is a little different.

flb_sds_printf appends offset flb_sds_len(s) and it means appends string.
https://github.com/fluent/fluent-bit/blob/v1.8.10/src/flb_sds.c#L367

I want to overwrite flb_sds_t.
So if we use the API to overwrite, we need to call flb_sds_len_set(sds, 0) before the API.

flb_sds_len_set(j_index, 0);
out_buf = flb_sds_printf(&j_index,
                        ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
                        es_index,  id_key_str);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I understand. thanks for the clarification.

On that case so I would suggest renaming the new function from:

  • flb_sds_snprintf_realloc

to

  • flb_sds_snprintf

basically the "realloc" context is something that works behind the scenes, where the user don't need to be aware of it (similar to other sds...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for comment.
I renamed.

/*
* flb_sds_snprintf is a wrapper of snprintf.
* The difference is that this function can increase the buffer of flb_sds_t.
*/
int flb_sds_snprintf(flb_sds_t *str, size_t size, const char *fmt, ...)
{
va_list va;
flb_sds_t tmp;
int ret;

retry_snprintf:
va_start(va, fmt);
ret = vsnprintf(*str, size, fmt, va);
if (ret > size) {
tmp = flb_sds_increase(*str, ret-size);
if (tmp == NULL) {
return -1;
}
*str = tmp;
size = ret;
va_end(va);
goto retry_snprintf;
}
va_end(va);

return ret;
}
72 changes: 72 additions & 0 deletions tests/runtime/out_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,80 @@ void flb_test_div0()
flb_destroy(ctx);
}


static void cb_check_long_index(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
char *out_js = res_data;
char long_index[256] = {0};
int i;

for (i=0; i<sizeof(long_index)-1; i++) {
long_index[i] = '0' + (i%10);
}

p = strstr(out_js, &long_index[0]);
TEST_CHECK(p != NULL);
flb_free(res_data);
}

/* https://github.com/fluent/fluent-bit/issues/4311 */
void flb_test_long_index()
{
int ret;
int size = sizeof(JSON_ES) -1;
char long_index[256] = {0};
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;
int i;

for (i=0; i<sizeof(long_index)-1; i++) {
long_index[i] = '0' + (i%10);
}

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
"generate_id", "true",
"index", &long_index[0],
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_long_index,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
flb_lib_push(ctx, in_ffd, (char *)JSON_ES, size);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{"long_index" , flb_test_long_index },
{"div0_error" , flb_test_div0 },
{"index_type" , flb_test_index_type },
{"logstash_format" , flb_test_logstash_format },
Expand Down