Skip to content

in_forward: Implement decompression method handlers for gzip and zstd#10710

Merged
edsiper merged 7 commits into
masterfrom
cosmo0920-implement-decompression-method-handlers-for-gzip-and-zstd-in_forward
Sep 18, 2025
Merged

in_forward: Implement decompression method handlers for gzip and zstd#10710
edsiper merged 7 commits into
masterfrom
cosmo0920-implement-decompression-method-handlers-for-gzip-and-zstd-in_forward

Conversation

@cosmo0920

@cosmo0920 cosmo0920 commented Aug 8, 2025

Copy link
Copy Markdown
Contributor

This PR is a subsequent PR for #10697.
Still in PoC but this PR could be more robust and generalized approach with using flb_compression function that is following the size of chunk settings.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Forward input now supports gzip and zstd compressed payloads.
    • Automatic detection of compression type with streaming decompression for large payloads.
  • Refactor

    • Unified handling of compressed and uncompressed data paths for the Forward protocol.
    • Enhanced robustness with clearer error handling and resource cleanup during decompression.
  • Tests

    • Added runtime tests covering gzip and zstd Forward payloads to ensure end-to-end compatibility.

@coderabbitai

coderabbitai Bot commented Aug 8, 2025

Copy link
Copy Markdown

Walkthrough

Adds decompression support to in_forward: introduces compression_type and d_ctx to fw_conn, initializes/destroys decompression context, refactors protocol processing to detect gzip/zstd via options and magic bytes, switches to streaming decompression, updates error handling, and adds runtime tests for gzip and zstd payloads.

Changes

Cohort / File(s) Summary
Forward connection lifecycle
plugins/in_forward/fw_conn.c, plugins/in_forward/fw_conn.h
Add fields compression_type (default NONE) and d_ctx to struct fw_conn; include compression header; initialize on create; destroy decompression context on delete.
Forward protocol handling
plugins/in_forward/fw_prot.c
Replace gzip-only path with streaming decompression; add option-based get_compression_type and magic sniff_magic; reconcile option vs magic; initialize/use/destroy conn->d_ctx; unify error cleanup; support gzip and zstd; preserve public API.
Runtime tests for compressed input
tests/runtime/in_forward.c
Add helpers to build gzip/zstd forward payloads; add tests flb_test_forward_gzip and flb_test_forward_zstd; include gzip/zstd headers; register tests in suite.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant C as Client
  participant IF as in_forward
  participant FP as fw_prot_process
  participant DC as DecompressionCtx
  participant OUT as append_log / pipeline

  C->>IF: Send Forward payload (tag, entries, options)
  IF->>FP: fw_prot_process(conn, buf)
  FP->>FP: Parse options (event type, "compressed")
  FP->>FP: Sniff magic bytes
  Note over FP: Determine compression type (option vs magic)

  alt Compressed (gzip/zstd)
    FP->>DC: Init conn.d_ctx (type)
    loop Decompress stream
      FP->>DC: flb_decompress(chunk)
      DC-->>FP: decompressed entries
      FP->>OUT: append_log(event_type, entries)
    end
    FP->>DC: Destroy conn.d_ctx
  else Uncompressed
    FP->>OUT: append_log(event_type, entries)
  end

  FP-->>IF: ACK status
  IF-->>C: Response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • koleini
  • fujimotos
  • edsiper
  • leonardo-albertovich

Pre-merge checks (2 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.08% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title accurately and concisely describes the primary change: adding decompression handlers for gzip and zstd in the in_forward plugin. It names the affected plugin and the implemented feature without extraneous detail, so a reviewer scanning history will understand the main intent. The phrasing is clear and focused on the main change implemented in the PR.

Poem

A packet hops with zipped-up cheer,
I sniff the bytes—what’s hiding here?
Gzip? Zstd? I toggle the stream,
Unravel logs like carrot-cream.
Context set, cleanup tight—
Forward flows through day and night.
Thump-thump: ACKs in pure delight.

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.

✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch cosmo0920-implement-decompression-method-handlers-for-gzip-and-zstd-in_forward

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cosmo0920 cosmo0920 force-pushed the cosmo0920-implement-decompression-method-handlers-for-gzip-and-zstd-in_forward branch from 7ec302d to 19b7f47 Compare August 15, 2025 05:05
@cosmo0920 cosmo0920 force-pushed the cosmo0920-implement-decompression-method-handlers-for-gzip-and-zstd-in_forward branch from 19b7f47 to b91489f Compare August 18, 2025 11:52
… of the gzip and zstd compressions

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
plugins/in_forward/fw_prot.c (2)

1564-1620: Review decompression error handling and buffer management.

The streaming decompression implementation looks solid with proper buffer resizing and error handling. However, there are a few concerns:

  1. The decompression buffer allocation at line 1585 should be checked against buffer_max_size to prevent potential memory exhaustion
  2. Consider adding debug logging for successful decompression chunks to aid troubleshooting
+                        if (ctx->buffer_chunk_size > ctx->buffer_max_size) {
+                            flb_plg_error(ctx->ins, "buffer_chunk_size exceeds buffer_max_size");
+                            goto cleanup_decompress;
+                        }
                         decomp_buf = flb_malloc(ctx->buffer_chunk_size);
                         if (!decomp_buf) {
                             flb_errno();

                             goto cleanup_decompress;
                         }

                         do {
                             decomp_len = ctx->buffer_chunk_size;
                             decomp_ret = flb_decompress(conn->d_ctx, decomp_buf, &decomp_len);

                             if (decomp_ret == FLB_DECOMPRESSOR_FAILURE) {
                                 if (decomp_len > 0) {
                                     flb_plg_error(ctx->ins, "decompression failed, data may be corrupt");
                                     flb_free(decomp_buf);

                                     goto cleanup_decompress;
                                 }
                                 break;
                             }

                             if (decomp_len > 0) {
+                                flb_plg_trace(ctx->ins, "decompressed %zu bytes", decomp_len);
                                 if (append_log(ins, conn, event_type, out_tag, decomp_buf, decomp_len) == -1) {
                                     flb_free(decomp_buf);

                                     goto cleanup_decompress;
                                 }
                             }
                         } while (decomp_len > 0);

1520-1533: Consider validating event_type before use.

The event type is retrieved early but should be validated to ensure it's within expected bounds before being passed to append_log.

                     event_type = FLB_EVENT_TYPE_LOGS;
                     if (contain_options) {
                         ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
                         if (ret == -1) {
                             flb_plg_error(ctx->ins, "invalid chunk event type");
                             msgpack_unpacked_destroy(&result);
                             flb_sds_destroy(out_tag);
                             msgpack_unpacker_free(unp);
                             return -1;
                         }
                         event_type = ret;
+                        /* Validate event type is within expected range */
+                        if (event_type != FLB_EVENT_TYPE_LOGS && 
+                            event_type != FLB_EVENT_TYPE_METRICS &&
+                            event_type != FLB_EVENT_TYPE_TRACES) {
+                            flb_plg_error(ctx->ins, "unexpected event type: %d", event_type);
+                            msgpack_unpacked_destroy(&result);
+                            flb_sds_destroy(out_tag);
+                            msgpack_unpacker_free(unp);
+                            return -1;
+                        }
                     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e567bcc and c9635e0.

📒 Files selected for processing (4)
  • plugins/in_forward/fw_conn.c (2 hunks)
  • plugins/in_forward/fw_conn.h (2 hunks)
  • plugins/in_forward/fw_prot.c (4 hunks)
  • tests/runtime/in_forward.c (3 hunks)
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/in_forward/fw_conn.h
  • tests/runtime/in_forward.c
  • plugins/in_forward/fw_prot.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/in_forward/fw_conn.h
  • tests/runtime/in_forward.c
  • plugins/in_forward/fw_prot.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • tests/runtime/in_forward.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • tests/runtime/in_forward.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • tests/runtime/in_forward.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • tests/runtime/in_forward.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
PR: fluent/fluent-bit#9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • tests/runtime/in_forward.c
🧬 Code graph analysis (3)
plugins/in_forward/fw_conn.c (1)
src/flb_compression.c (1)
  • flb_decompression_context_destroy (125-149)
tests/runtime/in_forward.c (5)
src/flb_pack.c (1)
  • flb_pack_json (335-341)
src/flb_gzip.c (1)
  • flb_gzip_compress (157-252)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/flb_lib.c (2)
  • flb_output_set (515-546)
  • flb_start (914-925)
src/flb_zstd.c (1)
  • flb_zstd_compress (33-57)
plugins/in_forward/fw_prot.c (3)
src/flb_sds.c (1)
  • flb_sds_destroy (389-399)
src/flb_compression.c (6)
  • flb_decompression_context_create (151-219)
  • flb_decompression_context_get_available_space (73-93)
  • flb_decompression_context_resize_buffer (95-122)
  • flb_decompression_context_get_append_buffer (61-71)
  • flb_decompress (221-241)
  • flb_decompression_context_destroy (125-149)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
🔇 Additional comments (8)
plugins/in_forward/fw_conn.h (1)

53-56: LGTM! Well-structured addition of decompression fields.

The new fields are properly placed within the fw_conn structure and follow the existing conventions. The comments clearly indicate their purpose.

plugins/in_forward/fw_conn.c (2)

192-194: LGTM! Proper initialization of decompression fields.

The initialization correctly sets the compression type to NONE and the decompression context to NULL, establishing a clean initial state.


225-229: Consider adding NULL check before destruction.

While flb_decompression_context_destroy safely handles NULL input (as seen in the relevant code snippets), the current check is good defensive programming practice.

tests/runtime/in_forward.c (1)

571-618: LGTM! Well-structured gzip test payload creation.

The function properly creates a forward-protocol-compliant gzip-compressed MessagePack payload with the correct structure [tag, compressed_events, {options}].

plugins/in_forward/fw_prot.c (4)

93-120: LGTM! Clean implementation of compression type detection.

The function properly detects gzip and zstd compression types from the options map and returns appropriate algorithm constants.


1232-1243: LGTM! Robust magic byte detection.

The function correctly identifies gzip and zstd formats using their magic byte signatures. The gzip check uses the standard 0x1f8b signature, and zstd uses the correct 0x28b52ffd signature.


1665-1676: LGTM! Proper cleanup labels for error handling.

The cleanup labels provide a clear error handling path with proper resource deallocation. The FALLTHRU comment correctly indicates the intentional fall-through behavior.


1534-1563: Compression detection logic validated — no change required.

Logic correctly prefers magic bytes when both option and sniff are present and disagree; the warning only fires in that case and the "zstd"/"gzip" mapping aligns with the current enums.

Comment thread tests/runtime/in_forward.c
Comment on lines +619 to +681
void flb_test_forward_gzip()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
flb_sockfd_t fd;
int ret;
int num;
ssize_t w_size;

char *buf;
size_t size;

msgpack_sbuffer sbuf;

clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "\"test\":\"msg\"";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "test",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

fd = connect_tcp(NULL, -1);
if (!TEST_CHECK(fd >= 0)) {
exit(EXIT_FAILURE);
}

msgpack_sbuffer_init(&sbuf);
create_simple_json_gzip(&sbuf);

w_size = send(fd, sbuf.data, sbuf.size, 0);
if (!TEST_CHECK(w_size == sbuf.size)) {
TEST_MSG("failed to send, errno=%d", errno);
flb_socket_close(fd);
msgpack_sbuffer_destroy(&sbuf);
exit(EXIT_FAILURE);
}

msgpack_sbuffer_destroy(&sbuf);

flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}

flb_socket_close(fd);
test_ctx_destroy(ctx);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for msgpack_sbuffer_init.

The function should check the return value of create_simple_json_gzip for errors.

     msgpack_sbuffer_init(&sbuf);
-    create_simple_json_gzip(&sbuf);
+    ret = create_simple_json_gzip(&sbuf);
+    if (!TEST_CHECK(ret == 0)) {
+        TEST_MSG("create_simple_json_gzip failed");
+        msgpack_sbuffer_destroy(&sbuf);
+        flb_socket_close(fd);
+        test_ctx_destroy(ctx);
+        exit(EXIT_FAILURE);
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void flb_test_forward_gzip()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
flb_sockfd_t fd;
int ret;
int num;
ssize_t w_size;
char *buf;
size_t size;
msgpack_sbuffer sbuf;
clear_output_num();
cb_data.cb = cb_check_result_json;
cb_data.data = "\"test\":\"msg\"";
ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "test",
"format", "json",
NULL);
TEST_CHECK(ret == 0);
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);
fd = connect_tcp(NULL, -1);
if (!TEST_CHECK(fd >= 0)) {
exit(EXIT_FAILURE);
}
msgpack_sbuffer_init(&sbuf);
create_simple_json_gzip(&sbuf);
w_size = send(fd, sbuf.data, sbuf.size, 0);
if (!TEST_CHECK(w_size == sbuf.size)) {
TEST_MSG("failed to send, errno=%d", errno);
flb_socket_close(fd);
msgpack_sbuffer_destroy(&sbuf);
exit(EXIT_FAILURE);
}
msgpack_sbuffer_destroy(&sbuf);
flb_time_msleep(1500);
num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_socket_close(fd);
test_ctx_destroy(ctx);
}
void flb_test_forward_gzip()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
flb_sockfd_t fd;
int ret;
int num;
ssize_t w_size;
char *buf;
size_t size;
msgpack_sbuffer sbuf;
clear_output_num();
cb_data.cb = cb_check_result_json;
cb_data.data = "\"test\":\"msg\"";
ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "test",
"format", "json",
NULL);
TEST_CHECK(ret == 0);
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);
fd = connect_tcp(NULL, -1);
if (!TEST_CHECK(fd >= 0)) {
exit(EXIT_FAILURE);
}
msgpack_sbuffer_init(&sbuf);
ret = create_simple_json_gzip(&sbuf);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("create_simple_json_gzip failed");
msgpack_sbuffer_destroy(&sbuf);
flb_socket_close(fd);
test_ctx_destroy(ctx);
exit(EXIT_FAILURE);
}
w_size = send(fd, sbuf.data, sbuf.size, 0);
if (!TEST_CHECK(w_size == sbuf.size)) {
TEST_MSG("failed to send, errno=%d", errno);
flb_socket_close(fd);
msgpack_sbuffer_destroy(&sbuf);
exit(EXIT_FAILURE);
}
msgpack_sbuffer_destroy(&sbuf);
flb_time_msleep(1500);
num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_socket_close(fd);
test_ctx_destroy(ctx);
}
🤖 Prompt for AI Agents
In tests/runtime/in_forward.c around lines 619 to 681, the test calls
msgpack_sbuffer_init() then create_simple_json_gzip(&sbuf) but does not check
the return value; add error handling: capture the return of
create_simple_json_gzip, and if it indicates failure, log a TEST_MSG, destroy
the sbuffer (msgpack_sbuffer_destroy(&sbuf)), close the socket, and exit the
test (or return) with failure; ensure any early-exit cleans up ctx and fd the
same way as other error branches so no resources leak.

Comment on lines +730 to +792
void flb_test_forward_zstd()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
flb_sockfd_t fd;
int ret;
int num;
ssize_t w_size;

char *buf;
size_t size;

msgpack_sbuffer sbuf;

clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "\"test\":\"msg\"";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "test",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

fd = connect_tcp(NULL, -1);
if (!TEST_CHECK(fd >= 0)) {
exit(EXIT_FAILURE);
}

msgpack_sbuffer_init(&sbuf);
create_simple_json_zstd(&sbuf);

w_size = send(fd, sbuf.data, sbuf.size, 0);
if (!TEST_CHECK(w_size == sbuf.size)) {
TEST_MSG("failed to send, errno=%d", errno);
flb_socket_close(fd);
msgpack_sbuffer_destroy(&sbuf);
exit(EXIT_FAILURE);
}

msgpack_sbuffer_destroy(&sbuf);

flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}

flb_socket_close(fd);
test_ctx_destroy(ctx);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for create_simple_json_zstd.

Similar to the gzip test, the zstd test should check for errors when creating the test payload.

     msgpack_sbuffer_init(&sbuf);
-    create_simple_json_zstd(&sbuf);
+    ret = create_simple_json_zstd(&sbuf);
+    if (!TEST_CHECK(ret == 0)) {
+        TEST_MSG("create_simple_json_zstd failed");
+        msgpack_sbuffer_destroy(&sbuf);
+        flb_socket_close(fd);
+        test_ctx_destroy(ctx);
+        exit(EXIT_FAILURE);
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void flb_test_forward_zstd()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
flb_sockfd_t fd;
int ret;
int num;
ssize_t w_size;
char *buf;
size_t size;
msgpack_sbuffer sbuf;
clear_output_num();
cb_data.cb = cb_check_result_json;
cb_data.data = "\"test\":\"msg\"";
ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "test",
"format", "json",
NULL);
TEST_CHECK(ret == 0);
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);
fd = connect_tcp(NULL, -1);
if (!TEST_CHECK(fd >= 0)) {
exit(EXIT_FAILURE);
}
msgpack_sbuffer_init(&sbuf);
create_simple_json_zstd(&sbuf);
w_size = send(fd, sbuf.data, sbuf.size, 0);
if (!TEST_CHECK(w_size == sbuf.size)) {
TEST_MSG("failed to send, errno=%d", errno);
flb_socket_close(fd);
msgpack_sbuffer_destroy(&sbuf);
exit(EXIT_FAILURE);
}
msgpack_sbuffer_destroy(&sbuf);
flb_time_msleep(1500);
num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_socket_close(fd);
test_ctx_destroy(ctx);
}
void flb_test_forward_zstd()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
flb_sockfd_t fd;
int ret;
int num;
ssize_t w_size;
char *buf;
size_t size;
msgpack_sbuffer sbuf;
clear_output_num();
cb_data.cb = cb_check_result_json;
cb_data.data = "\"test\":\"msg\"";
ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "test",
"format", "json",
NULL);
TEST_CHECK(ret == 0);
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);
fd = connect_tcp(NULL, -1);
if (!TEST_CHECK(fd >= 0)) {
exit(EXIT_FAILURE);
}
msgpack_sbuffer_init(&sbuf);
ret = create_simple_json_zstd(&sbuf);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("create_simple_json_zstd failed");
msgpack_sbuffer_destroy(&sbuf);
flb_socket_close(fd);
test_ctx_destroy(ctx);
exit(EXIT_FAILURE);
}
w_size = send(fd, sbuf.data, sbuf.size, 0);
if (!TEST_CHECK(w_size == sbuf.size)) {
TEST_MSG("failed to send, errno=%d", errno);
flb_socket_close(fd);
msgpack_sbuffer_destroy(&sbuf);
exit(EXIT_FAILURE);
}
msgpack_sbuffer_destroy(&sbuf);
flb_time_msleep(1500);
num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_socket_close(fd);
test_ctx_destroy(ctx);
}
🤖 Prompt for AI Agents
In tests/runtime/in_forward.c around lines 730-792, the call to
create_simple_json_zstd is unchecked; add error handling like the gzip test:
after msgpack_sbuffer_init(&sbuf) call, verify create_simple_json_zstd(&sbuf)
returns success, and on failure call TEST_MSG with an error,
msgpack_sbuffer_destroy(&sbuf), flb_socket_close(fd), and exit(EXIT_FAILURE) (or
return) to avoid sending invalid data.

@edsiper edsiper merged commit be25573 into master Sep 18, 2025
55 checks passed
@edsiper edsiper deleted the cosmo0920-implement-decompression-method-handlers-for-gzip-and-zstd-in_forward branch September 18, 2025 02:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants