Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions include/fluent-bit/aws/flb_aws_msk_iam.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,21 @@

struct flb_aws_msk_iam;

struct flb_msk_iam_cb {
void *plugin_ctx;
struct flb_aws_msk_iam *iam;
char *broker_host; /* Store the actual broker hostname */
};

/*
* Register the oauthbearer refresh callback for MSK IAM authentication.
* Parameters:
* - config: Fluent Bit configuration
* - kconf: rdkafka configuration
* - opaque: Kafka opaque context (will be set with MSK IAM context)
* - brokers: Comma-separated list of broker addresses (used to extract AWS region if region is NULL)
* - region: Optional AWS region (if NULL, will be auto-detected from brokers)
* Returns context pointer on success or NULL on failure.
*/
struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config,
rd_kafka_conf_t *kconf,
const char *cluster_arn,
struct flb_kafka_opaque *opaque);
struct flb_kafka_opaque *opaque,
const char *brokers,
const char *region);
void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx);

#endif
183 changes: 122 additions & 61 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,40 +268,48 @@ static int in_kafka_init(struct flb_input_instance *ins,
return -1;
}

#ifdef FLB_HAVE_AWS_MSK_IAM
/*
* When MSK IAM auth is enabled, default the required
* security settings so users don't need to specify them.
*/
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) {
conf = flb_input_get_property("rdkafka.security.protocol", ins);
if (!conf) {
flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL");
/* Retrieve SASL mechanism if configured */
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (conf) {
ctx->sasl_mechanism = flb_sds_create(conf);
if (!ctx->sasl_mechanism) {
flb_plg_error(ins, "failed to allocate SASL mechanism string");
flb_free(ctx);
return -1;
}

conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (!conf) {
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);

#ifdef FLB_HAVE_AWS_MSK_IAM
/* Check if using aws_msk_iam as SASL mechanism */
if (strcasecmp(conf, "aws_msk_iam") == 0) {
flb_sds_t new_sasl;

/* Mark that user explicitly requested AWS MSK IAM */
ctx->aws_msk_iam = FLB_TRUE;

/* Set SASL mechanism to OAUTHBEARER for librdkafka */
new_sasl = flb_sds_create("OAUTHBEARER");
if (!new_sasl) {
flb_plg_error(ins, "failed to allocate SASL mechanism string");
flb_sds_destroy(ctx->sasl_mechanism);
flb_free(ctx);
return -1;
}

flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER");
ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER");
}
else {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
flb_sds_destroy(ctx->sasl_mechanism);
ctx->sasl_mechanism = new_sasl;

/* Ensure security protocol is set */
conf = flb_input_get_property("rdkafka.security.protocol", ins);
if (!conf) {
flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL");
}

flb_plg_info(ins, "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism");
}
}
else {
#endif

/* Retrieve SASL mechanism if configured */
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (conf) {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
}

#ifdef FLB_HAVE_AWS_MSK_IAM
}
#endif

kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
if (!kafka_conf) {
Expand Down Expand Up @@ -351,26 +359,47 @@ static int in_kafka_init(struct flb_input_instance *ins,
flb_kafka_opaque_set(ctx->opaque, ctx, NULL);
rd_kafka_conf_set_opaque(kafka_conf, ctx->opaque);

/*
* Enable SASL queue for all OAUTHBEARER configurations.
* This allows librdkafka to handle OAuth token refresh in a background thread,
* which is essential for idle connections or when poll intervals are large.
* This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
*/
if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
rd_kafka_conf_enable_sasl_queue(kafka_conf, 1);
flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism");
}

#ifdef FLB_HAVE_AWS_MSK_IAM
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism &&
/* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */
if (ctx->aws_msk_iam && ctx->sasl_mechanism &&
strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
flb_plg_info(ins, "registering MSK IAM authentication with cluster ARN: %s",
ctx->aws_msk_iam_cluster_arn);
ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
kafka_conf,
ctx->aws_msk_iam_cluster_arn,
ctx->opaque);
if (!ctx->msk_iam) {
flb_plg_error(ins, "failed to setup MSK IAM authentication");
/* Register MSK IAM OAuth callback */
if (ctx->kafka.brokers) {
flb_plg_info(ins, "registering AWS MSK IAM authentication OAuth callback");
ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
kafka_conf,
ctx->opaque,
ctx->kafka.brokers,
ctx->aws_region);

if (!ctx->msk_iam) {
flb_plg_error(ins, "failed to setup MSK IAM authentication OAuth callback");
goto init_error;
}
else {
res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config",
"principal=admin", errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins,
"failed to set sasl.oauthbearer.config: %s",
errstr);
}
}
}
else {
res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config",
"principal=admin", errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins,
"failed to set sasl.oauthbearer.config: %s",
errstr);
}
flb_plg_error(ins, "brokers configuration is required for MSK IAM authentication");
goto init_error;
}
}
#endif
Expand All @@ -380,9 +409,36 @@ static int in_kafka_init(struct flb_input_instance *ins,
/* Create Kafka consumer handle */
if (!ctx->kafka.rk) {
flb_plg_error(ins, "Failed to create new consumer: %s", errstr);
/* rd_kafka_new() did NOT take ownership on failure; kafka_conf is
* still valid and will be destroyed by init_error cleanup path. */
goto init_error;
}

/* rd_kafka_new() takes ownership of kafka_conf on success */
kafka_conf = NULL;

/*
* Enable SASL background callbacks for all OAUTHBEARER configurations.
* This ensures OAuth tokens are refreshed automatically even when:
* - Poll intervals are large
* - Topics have no messages
* - Collector is paused
* This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
*/
if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
rd_kafka_error_t *error;
error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk);
if (error) {
flb_plg_warn(ins, "failed to enable SASL background callbacks: %s. "
"OAuth tokens may not refresh during idle periods.",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
}
else {
flb_plg_info(ins, "OAUTHBEARER: SASL background callbacks enabled");
}
}

/* Trigger initial token refresh for OAUTHBEARER */
rd_kafka_poll(ctx->kafka.rk, 0);

Expand Down Expand Up @@ -449,15 +505,23 @@ static int in_kafka_init(struct flb_input_instance *ins,
}
if (ctx->kafka.rk) {
rd_kafka_consumer_close(ctx->kafka.rk);
/* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */
rd_kafka_destroy(ctx->kafka.rk);
}
else if (kafka_conf) {
/* If rd_kafka was never created, we need to destroy conf manually */
rd_kafka_conf_destroy(kafka_conf);
}
if (ctx->opaque) {
flb_kafka_opaque_destroy(ctx->opaque);
}
else if (kafka_conf) {
/* conf is already destroyed when rd_kafka is initialized */
rd_kafka_conf_destroy(kafka_conf);

#ifdef FLB_HAVE_AWS_MSK_IAM
if (ctx->msk_iam) {
flb_aws_msk_iam_destroy(ctx->msk_iam);
}
#endif

flb_sds_destroy(ctx->sasl_mechanism);
flb_free(ctx);

Expand Down Expand Up @@ -552,6 +616,16 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the librdkafka options"
},
#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_region", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_region),
"AWS region for MSK IAM authentication. If not set, region will be "
"auto-detected from broker hostname (supports standard MSK, Serverless, "
"and VPC endpoint formats). Required when using custom DNS names "
"(e.g., PrivateLink) with MSK IAM."
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
#endif
{
FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_IN_KAFKA_BUFFER_MAX_SIZE,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
Expand All @@ -571,19 +645,6 @@ static struct flb_config_map config_map[] = {
"Rely on kafka auto-commit and commit messages in batches"
},

#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam_cluster_arn),
"ARN of the MSK cluster when using AWS IAM authentication"
},
{
FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false",
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam),
"Enable AWS MSK IAM authentication"
},
#endif
Comment thread
cosmo0920 marked this conversation as resolved.

/* EOF */
{0}
};
Expand Down
4 changes: 2 additions & 2 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ struct flb_in_kafka_config {
struct flb_kafka_opaque *opaque;

#ifdef FLB_HAVE_AWS_MSK_IAM
flb_sds_t aws_msk_iam_cluster_arn;
struct flb_aws_msk_iam *msk_iam;
int aws_msk_iam; /* Flag to indicate user explicitly requested AWS MSK IAM */
char *aws_region; /* AWS region for MSK IAM (optional, auto-detected if not set) */
#endif

/* SASL mechanism configured in rdkafka.sasl.mechanism */
int aws_msk_iam;
flb_sds_t sasl_mechanism;
};

Expand Down
22 changes: 9 additions & 13 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,15 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the kafka group_id."
},
#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_region", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_region),
"AWS region for MSK IAM authentication. If not set, region will be "
"auto-detected from broker hostname (only works for standard MSK endpoints). "
"Required when using custom DNS names (e.g., PrivateLink) with MSK IAM."
},
#endif
{
FLB_CONFIG_MAP_STR, "raw_log_key", NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, raw_log_key),
Expand All @@ -1532,19 +1541,6 @@ static struct flb_config_map config_map[] = {
"that key will be sent to Kafka."
},

#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam_cluster_arn),
"ARN of the MSK cluster when using AWS IAM authentication"
},
{
FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false",
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam),
"Enable AWS MSK IAM authentication"
},
#endif
Comment thread
kalavt marked this conversation as resolved.

/* EOF */
{0}
};
Expand Down
Loading