Skip to content

Commit 74f0bdd

Browse files
siji-oncosmo0920
authored andcommitted
avro: out_kafka: fix a bug in avro schema_id
Signed-off-by: jh.park <jh.park@bucketplace.net>
1 parent 00e2f56 commit 74f0bdd

4 files changed

Lines changed: 19 additions & 27 deletions

File tree

include/fluent-bit/flb_avro.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#define MEMORY_POOL_MINIMUM_SIZE sizeof(void *)
3232

3333
struct flb_avro_fields {
34-
flb_sds_t schema_id;
34+
int schema_id;
3535
flb_sds_t schema_str;
3636
};
3737

plugins/out_kafka/kafka.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -312,29 +312,29 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
312312
#ifdef FLB_HAVE_AVRO_ENCODER
313313
else if (ctx->format == FLB_KAFKA_FMT_AVRO) {
314314

315-
flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id);
315+
flb_plg_debug(ctx->ins, "avro schema ID:%d:\n", ctx->avro_fields.schema_id);
316316
flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str);
317317

318318
// if there's no data then log it and return
319319
if (mp_sbuf.size == 0) {
320-
flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id);
320+
flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%d:\n", ctx->avro_fields.schema_id);
321321
msgpack_sbuffer_destroy(&mp_sbuf);
322322
return FLB_OK;
323323
}
324324

325325
// is the line is too long log it and return
326326
if (mp_sbuf.size > AVRO_LINE_MAX_LEN) {
327-
flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id);
327+
flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id);
328328
msgpack_sbuffer_destroy(&mp_sbuf);
329329
return FLB_OK;
330330
}
331331

332-
flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id);
332+
flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id);
333333
out_buf = avro_buff;
334334
out_size = AVRO_DEFAULT_BUFFER_SIZE;
335335

336336
if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) {
337-
flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id);
337+
flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id);
338338
avro_fast_buffer = false;
339339
// avro will always be smaller than msgpack
340340
// it contains no meta-info aside from the schemaid
@@ -344,14 +344,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
344344
out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD;
345345
out_buf = flb_malloc(out_size);
346346
if (!out_buf) {
347-
flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
347+
flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%d:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
348348
msgpack_sbuffer_destroy(&mp_sbuf);
349349
return FLB_ERROR;
350350
}
351351
}
352352

353353
if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) {
354-
flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
354+
flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%d:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
355355
msgpack_sbuffer_destroy(&mp_sbuf);
356356
if (!avro_fast_buffer) {
357357
flb_free(out_buf);
@@ -639,8 +639,8 @@ static struct flb_config_map config_map[] = {
639639
"Set AVRO schema."
640640
},
641641
{
642-
FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL,
643-
0, FLB_FALSE, 0,
642+
FLB_CONFIG_MAP_INT, "schema_id", (char *)NULL,
643+
0, FLB_TRUE, offsetof(struct flb_out_kafka, avro_fields) + offsetof(struct flb_avro_fields, schema_id),
644644
"Set AVRO schema ID."
645645
},
646646
#endif

plugins/out_kafka/kafka_config.c

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,6 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
249249
if (tmp) {
250250
ctx->avro_fields.schema_str = flb_sds_create(tmp);
251251
}
252-
tmp = flb_output_get_property("schema_id", ins);
253-
if (tmp) {
254-
ctx->avro_fields.schema_id = flb_sds_create(tmp);
255-
}
256252
#endif
257253

258254
/* Config: Topic */
@@ -282,7 +278,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
282278

283279
flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp);
284280
#ifdef FLB_HAVE_AVRO_ENCODER
285-
flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str);
281+
flb_plg_info(ctx->ins, "schemaID='%d' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str);
286282
#endif
287283

288284
return ctx;
@@ -324,7 +320,6 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
324320

325321
#ifdef FLB_HAVE_AVRO_ENCODER
326322
// avro
327-
flb_sds_destroy(ctx->avro_fields.schema_id);
328323
flb_sds_destroy(ctx->avro_fields.schema_str);
329324
#endif
330325

src/flb_avro.c

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_
268268

269269
avro_writer_t awriter;
270270
flb_debug("in flb_msgpack_raw_to_avro_sds\n");
271-
flb_debug("schemaID:%s:\n", ctx->schema_id);
271+
flb_debug("schemaID:%d:\n", ctx->schema_id);
272272
flb_debug("schema string:%s:\n", ctx->schema_str);
273273

274274
size_t schema_json_len = flb_sds_len(ctx->schema_str);
@@ -341,18 +341,15 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_
341341
}
342342

343343
// write the schemaid
344-
// its md5hash of the avro schema
345-
// it looks like this c4b52aaf22429c7f9eb8c30270bc1795
346-
const char *pos = ctx->schema_id;
347-
unsigned char val[16];
348-
size_t count;
349-
for (count = 0; count < sizeof val/sizeof *val; count++) {
350-
sscanf(pos, "%2hhx", &val[count]);
351-
pos += 2;
352-
}
344+
unsigned int id = ctx->schema_id;
345+
unsigned char val[4];
346+
val[0] = (id >> 24) & 0xFF;
347+
val[1] = (id >> 16) & 0xFF;
348+
val[2] = (id >> 8) & 0xFF;
349+
val[3] = id & 0xFF;
353350

354351
// write it into a buffer which can be passed to librdkafka
355-
rval = avro_write(awriter, val, 16);
352+
rval = avro_write(awriter, val, 4);
356353
if (rval != 0) {
357354
flb_error("Unable to write schemaid\n");
358355
avro_writer_free(awriter);

0 commit comments

Comments
 (0)