Skip to content

Commit 1519472

Browse files
cosmo0920edsiper
authored andcommitted
storage: Restore the original state of chunks
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent e183f81 commit 1519472

1 file changed

Lines changed: 39 additions & 9 deletions

File tree

src/flb_storage.c

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,24 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch
777777
*fs_chunks = storage_st.chunks_fs;
778778
}
779779

780+
/* Replace '/', '\\' and ':' with '_' to make filename components safe */
781+
static inline void sanitize_name_component(const char *in, char *out, size_t out_sz)
782+
{
783+
size_t i;
784+
785+
if (out_sz == 0) {
786+
return;
787+
}
788+
789+
if (!in) {
790+
in = "no-tag";
791+
}
792+
793+
for (i = 0; i < out_sz - 1 && in[i] != '\0'; i++) {
794+
out[i] = (in[i] == '/' || in[i] == '\\' || in[i] == ':') ? '_' : in[i];
795+
}
796+
out[i] = '\0';
797+
}
780798

781799
static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx)
782800
{
@@ -823,10 +841,13 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx,
823841
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
824842
struct cio_stream *dlq;
825843
void *buf = NULL;
844+
int was_up = 0;
826845
size_t size = 0;
827846
int err = 0;
828847
char name[256];
829848
struct cio_chunk *dst;
849+
char safe_tag[128];
850+
char safe_out[64];
830851

831852
if (!ctx || !src) {
832853
return -1;
@@ -836,26 +857,28 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx,
836857
return -1;
837858
}
838859

839-
if (cio_chunk_is_up(src) != CIO_TRUE) {
860+
/* Remember original state and bring the chunk up if needed */
861+
was_up = (cio_chunk_is_up(src) == CIO_TRUE);
862+
if (!was_up) {
840863
if (cio_chunk_up_force(src) != CIO_OK) {
841864
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
842865
return -1;
843866
}
844867
}
845868

869+
sanitize_name_component(tag, safe_tag, sizeof(safe_tag));
870+
sanitize_name_component(out_name ? out_name : "out", safe_out, sizeof(safe_out));
871+
872+
/* Compose a simple, unique-ish file name with sanitized pieces */
873+
snprintf(name, sizeof(name),
874+
"%s_%d_%s_%p.flb",
875+
safe_tag, status_code, safe_out, (void *) src);
876+
846877
if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) {
847878
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size);
848879
return -1;
849880
}
850881

851-
/* Compose a simple, unique-ish file name */
852-
snprintf(name, sizeof(name),
853-
"%s_%d_%s_%p.flb",
854-
tag ? tag : "no-tag",
855-
status_code,
856-
out_name ? out_name : "out",
857-
(void *) src);
858-
859882
/* Create + write the DLQ copy */
860883
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err);
861884
if (!dst) {
@@ -876,6 +899,13 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx,
876899

877900
flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size);
878901

902+
/* Restore original state if we brought the chunk up */
903+
if (!was_up) {
904+
if (cio_chunk_down(src) != CIO_OK) {
905+
flb_debug("[storage] failed to bring chunk back down after DLQ copy");
906+
}
907+
}
908+
879909
return 0;
880910
#else
881911
FLB_UNUSED(ctx);

0 commit comments

Comments
 (0)