[metrics] Materialize all metrics tags into top level columns#6237
[metrics] Materialize all metrics tags into top level columns#6237
Conversation
fd2f214 to
7621e22
Compare
7621e22 to
1b50f21
Compare
1b50f21 to
0b642de
Compare
0b642de to
7a5979f
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5eae799d9a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let empty_batch = | ||
| RecordBatch::new_empty(self.processor.schema().arrow_schema().clone()); | ||
| RecordBatch::new_empty(std::sync::Arc::new(arrow::datatypes::Schema::empty())); |
There was a problem hiding this comment.
Preserve required schema on checkpoint-only flush batches
Constructing the fallback checkpoint batch with Schema::empty() causes a hard failure when force_commit is true and no valid docs were forwarded (for example, all docs in the raw batch failed parsing): the indexer still flushes this batch, and ParquetWriter::write_to_file now rejects it because required columns are missing. In that path, the packager exits with an error instead of forwarding the checkpoint, which can stall ingestion progress for that shard.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
fixed an issue in this PR - we need to send a None batch to the packager, so that it skips the write. we were sending a empty batch with an empty schema, so the packager was still attempting to write.
| for scope_metrics in resource_metrics.scope_metrics { | ||
| for metric in scope_metrics.metrics { | ||
| parse_metric( | ||
| &metric, | ||
| &service_name, | ||
| &resource_attributes, | ||
| &mut data_points, | ||
| )?; | ||
| parse_metric(&metric, &service_name, &mut data_points, &mut num_rejected); | ||
| } |
There was a problem hiding this comment.
Materialize resource-level metric attributes into tags
Resource attributes are parsed but no longer propagated into per-point tags: after extracting service.name, metrics are parsed with only service_name, so dimensions that commonly live at resource scope (like env, region, host, custom resource tags) are dropped entirely. This is a data-loss regression versus the previous behavior and breaks filtering/grouping on those tags for OTLP senders that attach dimensions at the resource level.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
yeah. fwiw we're not using the OTLP endpoint, but fixing this.
g-talbot
left a comment
There was a problem hiding this comment.
This looks OK to me, modulo fix to the O(m*n) algorithm that should be O(n) that I talk about in there. I'll mark this as LGTM, but fix that, please.
| pub fn append(&mut self, data_point: &MetricDataPoint) { | ||
| self.metric_name.append_value(&data_point.metric_name); | ||
| self.metric_type.append_value(data_point.metric_type as u8); | ||
| pub fn append(&mut self, data_point: MetricDataPoint) { |
There was a problem hiding this comment.
Why just one point? Don't points come in batches?
There was a problem hiding this comment.
yeah, we could just append a vec of points. ill change this later, since future PRs depend on this call and don't want to deal with rebase issues.
| timestamp_secs_builder.append_value(dp.timestamp_secs); | ||
| value_builder.append_value(dp.value); | ||
|
|
||
| for (tag_idx, tag_key) in sorted_tag_keys.iter().enumerate() { |
There was a problem hiding this comment.
Whoa this double nested loop could be quite inefficient. It's O(m*n) where m is the number of columns and n is number of points. Why not just go through dp.tags and append the point to each column. You'll have to keep track of "last row since I appended a value" so you know how many nulls to append "in front" and "at the end", but those can be appended in a single operation ("append X nulls") which is much more efficient. That's ~O(n), instead. (This was a very important optimization in the Husky column handling, as a data point.)
There was a problem hiding this comment.
👍 for each point, we'll only iterate through the tags that exist for each point, and bulk append nulls where we can.
| for scope_metrics in resource_metrics.scope_metrics { | ||
| for metric in scope_metrics.metrics { | ||
| parse_metric( | ||
| &metric, | ||
| &service_name, | ||
| &resource_attributes, | ||
| &mut data_points, | ||
| )?; | ||
| parse_metric(&metric, &service_name, &mut data_points, &mut num_rejected); | ||
| } |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7593bc24e5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
… crate) Adds a DataFusion-based query execution layer on top of the wide-schema parquet metrics pipeline from PR quickwit-oss#6237. - New crate `quickwit-datafusion` with pluggable `QuickwitDataSource` trait, `DataFusionSessionBuilder`, `QuickwitSchemaProvider`, and distributed worker session setup via datafusion-distributed WorkerService - `MetricsDataSource` implements `QuickwitDataSource` for OSS parquet splits: metastore-backed split discovery, object-store caching, filter pushdown with CAST unwrapping fix, Substrait `ReadRel` consumption - `DataFusionService` gRPC (ExecuteSql + ExecuteSubstrait streaming) wired into quickwit-serve alongside the existing searcher and OTLP services - Distributed execution: DistributedPhysicalOptimizerRule produces PartitionIsolatorExec tasks (not shuffles) across multiple searcher nodes - Integration tests covering: pruning, aggregation, time range, GROUP BY, distributed tasks, NULL column fill for missing parquet fields, Substrait named-table queries, rollup from file, partial schema projection - dev/ local cluster setup with start-cluster, ingest-metrics, query-metrics Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6e2f0c999c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let attributes = extract_attributes(dp.attributes.clone()); | ||
| for (key, json_val) in attributes { | ||
| tags.insert(key, json_value_to_string(json_val)); | ||
| } |
There was a problem hiding this comment.
Filter reserved metric columns from dynamic tag keys
This path inserts every OTLP attribute key into tags without excluding reserved names (metric_name, metric_type, timestamp_secs, value), and those keys are later materialized as top-level columns. If a sender includes one of these attribute names, the batch can end up with duplicate column names, which makes downstream name-based lookups (index_of/schema validation/split metadata extraction) ambiguous and can cause rejected batches or incorrect field resolution. Please strip or rename reserved keys before adding them to tags.
Useful? React with 👍 / 👎.
…quired fields in otel metrics
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 24204fada3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f6a9163b83
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Implements QuickwitDataSource for the parquet metrics pipeline from PR quickwit-oss#6237. - MetricsDataSource: production (metastore-backed) and test (SimpleIndexResolver) - MetricsTableProvider: filter pushdown with CAST-unwrapping fix for timestamp - MetastoreSplitProvider: converts MetricsSplitQuery → ListMetricsSplitsQuery - MetastoreIndexResolver: resolves index URI → QuickwitObjectStore per query - MetricsSplitQuery + extract_split_filters: predicate extraction for split pruning - MetricsTableProviderFactory: CREATE EXTERNAL TABLE … STORED AS metrics support - test_utils: make_batch, TestSplitProvider, MetricsTestbed for integration tests Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
* feat(datafusion): add quickwit-datafusion core execution framework Introduces a generic DataFusion execution layer with a pluggable QuickwitDataSource trait. No data-source-specific code. - QuickwitDataSource trait + DataSourceContributions (contribution-return pattern) - DataFusionSessionBuilder with shared RuntimeEnv, check_invariants - QuickwitSchemaProvider backed by DataFusion MemorySchemaProvider for DDL tables - QuickwitWorkerSessionBuilder + build_quickwit_worker for distributed execution - QuickwitWorkerResolver, QuickwitTaskEstimator - QuickwitObjectStore: quickwit_storage::Storage → object_store::ObjectStore bridge - DataFusionService::execute_sql (streaming Arrow IPC responses) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * feat(datafusion): add MetricsDataSource for OSS parquet metrics Implements QuickwitDataSource for the parquet metrics pipeline from PR #6237. - MetricsDataSource: production (metastore-backed) and test (SimpleIndexResolver) - MetricsTableProvider: filter pushdown with CAST-unwrapping fix for timestamp - MetastoreSplitProvider: converts MetricsSplitQuery → ListMetricsSplitsQuery - MetastoreIndexResolver: resolves index URI → QuickwitObjectStore per query - MetricsSplitQuery + extract_split_filters: predicate extraction for split pruning - MetricsTableProviderFactory: CREATE EXTERNAL TABLE … STORED AS metrics support - test_utils: make_batch, TestSplitProvider, MetricsTestbed for integration tests Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * feat(datafusion): add QuickwitSubstraitConsumer Routes Substrait ReadRel nodes to registered QuickwitDataSource implementations. Standard NamedTable reads resolve via MetricsDataSource::try_consume_read_rel. ExtensionTable reads (custom protos) can be handled by downstream callers. - QuickwitSubstraitConsumer implementing datafusion-substrait SubstraitConsumer - execute_substrait_plan / execute_substrait_plan_streaming entry points - DataFusionService::execute_substrait (bytes) and execute_substrait_json (dev path) - session.rs: DataFusionSessionBuilder::execute_substrait convenience method Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * feat(datafusion): wire DataFusion service into gRPC, add proto and integration tests - Add datafusion.proto with DataFusionService (ExecuteSubstrait, ExecuteSql RPCs) - Generate codegen and mod.rs for the new proto service - Wire DataFusionService and WorkerService into quickwit-serve gRPC layer - Add DataFusionServiceGrpcImpl handler - Auto-create otel-metrics-v0_9 index on startup alongside logs/traces - Add metrics_datafusion_tests: in-process SQL + Substrait over parquet splits - Add metrics_distributed_tests: multi-node distributed execution - Add rollup_substrait.json fixture for Substrait plan testing Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix(datafusion): fix ParquetWriter::new call signature in test_utils Remove extra schema argument from ParquetWriter::new; the API only accepts a ParquetWriterConfig. Remove unused ParquetSchema import. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix(datafusion): fix ParquetWriter API and skip unimplemented ingest test - Remove erroneous ParquetSchema argument from ParquetWriter::new calls in integration tests (API takes only ParquetWriterConfig) - Mark test_rest_ingest_then_in_process_query as #[ignore] until the /ingest-metrics REST endpoint is wired in quickwit-serve Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * test(datafusion): remove unimplemented REST ingest test Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix(datafusion): address review findings - fix: CAST unwrapping in classify_filter — reuse predicate::column_name so time-range predicates are correctly classified as Inexact and passed to scan(); previously CAST-wrapped filters were silently dropped - fix: declare parquet sort order (metric_name, timestamp_secs ASC) on FileScanConfig so DataFusion avoids redundant sort operators - fix: get_opts now respects GetOptions.range — dispatches to get_slice for Bounded/Suffix ranges instead of always downloading the full file - fix: to_object_store_error propagates file path on NotFound - fix: register_for_worker made a no-op; lazy scan-path registration is sufficient and avoids O(indexes) metastore RPCs per worker task; removes stale comment claiming a non-existent object-store cache - fix: extract is_index_not_found helper, removing duplicated downcast block from try_consume_read_rel and create_default_table_provider - fix: sort before dedup in QuickwitSchemaProvider::table_names - fix: empty searcher pool returns Ok(vec![]) for local execution fallback - fix: remove dead builder methods with_udf_batch, with_codec_applier, with_physical_optimizer_rule from DataSourceContributions - feat: add tracing spans to execute_substrait and execute_sql - feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve - refactor: extract stream_to_receiver helper in gRPC handler Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * seperate dependencies with rust FF * integration testing refactor * object store refactor + testing * check metrics index * add explain for substrait * wire properties * rename metrics to parquet * cleanup * some cleanup * refactor to use more datafusion API's * license * add ci * upgrade to df 53 * assert no network shuffle * couple more comments + licenses and clippy * more license stuff --------- Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Description
This PR can be reviewed commit by commit.
Currently, we define a static schema for metrics data in parquet. This PR makes the schema dynamic - all tags are put into their own columns during ingestion. Incoming metrics points must have
"metric_name", "metric_type", "timestamp_secs", "value"as fields.Again, a lot of metrics parsing/arrow logic lives in
quickwit-opentelemetry, when it should not. We will refactor this, eventually :)How was this PR tested?
Describe how you tested this PR.