Skip to content

Commit 81ee67c

Browse files
cosmo0920edsiper
authored andcommitted
engine: Add a capability to handle dead letter queue for preserving invalid chunks for later verifications
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 6b74d58 commit 81ee67c

1 file changed

Lines changed: 51 additions & 0 deletions

File tree

src/flb_engine.c

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,50 @@ static inline double calculate_chunk_capacity_percent(struct flb_output_instance
232232
((double)ins->total_limit_size));
233233
}
234234

235+
static void handle_dlq_if_available(struct flb_config *config,
236+
struct flb_task *task,
237+
struct flb_output_instance *ins,
238+
int status_code /* pass 0 if unknown */)
239+
{
240+
const char *tag_buf = NULL;
241+
int tag_len = 0;
242+
flb_sds_t tag_sds = NULL;
243+
const char *tag = NULL;
244+
const char *out = NULL;
245+
struct flb_input_chunk *ic;
246+
struct cio_chunk *cio_ch;
247+
248+
if (!config || !config->storage_keep_rejected || !task || !task->ic || !ins) {
249+
return;
250+
}
251+
252+
ic = (struct flb_input_chunk *) task->ic;
253+
254+
if (!ic || !ic->chunk) {
255+
return;
256+
}
257+
258+
/* Obtain tag from the input chunk API (no direct field available) */
259+
if (flb_input_chunk_get_tag(ic, &tag_buf, &tag_len) == 0 && tag_buf && tag_len > 0) {
260+
tag_sds = flb_sds_create_len(tag_buf, tag_len); /* make it NUL-terminated */
261+
tag = tag_sds;
262+
}
263+
else {
264+
/* Fallback: use input instance name */
265+
tag = flb_input_name(task->i_ins);
266+
}
267+
268+
out = flb_output_name(ins);
269+
cio_ch = (struct cio_chunk *) ic->chunk; /* ic->chunk is a cio_chunk* under the hood */
270+
271+
/* Copy bytes into DLQ stream (filesystem) */
272+
(void) flb_storage_quarantine_chunk(config, cio_ch, tag, status_code, out);
273+
274+
if (tag_sds) {
275+
flb_sds_destroy(tag_sds);
276+
}
277+
}
278+
235279
static inline int handle_output_event(uint64_t ts,
236280
struct flb_config *config,
237281
uint64_t val)
@@ -366,6 +410,8 @@ static inline int handle_output_event(uint64_t ts,
366410
}
367411
else if (ret == FLB_RETRY) {
368412
if (ins->retry_limit == FLB_OUT_RETRY_NONE) {
413+
handle_dlq_if_available(config, task, ins, 0);
414+
369415
/* cmetrics: output_dropped_records_total */
370416
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
371417
1, (char *[]) {out_name});
@@ -412,6 +458,8 @@ static inline int handle_output_event(uint64_t ts,
412458
* - It reached the maximum number of re-tries
413459
*/
414460

461+
handle_dlq_if_available(config, task, ins, 0);
462+
415463
/* cmetrics */
416464
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name});
417465
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
@@ -464,6 +512,8 @@ static inline int handle_output_event(uint64_t ts,
464512
* memory available or we ran out of file descriptors.
465513
*/
466514
if (retry_seconds == -1) {
515+
handle_dlq_if_available(config, task, ins, 0);
516+
467517
flb_warn("[engine] retry for chunk '%s' could not be scheduled: "
468518
"input=%s > output=%s",
469519
flb_input_chunk_get_name(task->ic),
@@ -500,6 +550,7 @@ static inline int handle_output_event(uint64_t ts,
500550
}
501551
}
502552
else if (ret == FLB_ERROR) {
553+
handle_dlq_if_available(config, task, ins, 0);
503554
/* cmetrics */
504555
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name});
505556
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,

0 commit comments

Comments
 (0)