feat(datafusion): DataFusion metrics query layer#6276
feat(datafusion): DataFusion metrics query layer#6276alexanderbianchi wants to merge 12 commits intoquickwit-oss:mainfrom
Conversation
Introduces a generic DataFusion execution layer with a pluggable QuickwitDataSource trait. No data-source-specific code. - QuickwitDataSource trait + DataSourceContributions (contribution-return pattern) - DataFusionSessionBuilder with shared RuntimeEnv, check_invariants - QuickwitSchemaProvider backed by DataFusion MemorySchemaProvider for DDL tables - QuickwitWorkerSessionBuilder + build_quickwit_worker for distributed execution - QuickwitWorkerResolver, QuickwitTaskEstimator - QuickwitObjectStore: quickwit_storage::Storage → object_store::ObjectStore bridge - DataFusionService::execute_sql (streaming Arrow IPC responses) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Implements QuickwitDataSource for the parquet metrics pipeline from PR quickwit-oss#6237. - MetricsDataSource: production (metastore-backed) and test (SimpleIndexResolver) - MetricsTableProvider: filter pushdown with CAST-unwrapping fix for timestamp - MetastoreSplitProvider: converts MetricsSplitQuery → ListMetricsSplitsQuery - MetastoreIndexResolver: resolves index URI → QuickwitObjectStore per query - MetricsSplitQuery + extract_split_filters: predicate extraction for split pruning - MetricsTableProviderFactory: CREATE EXTERNAL TABLE … STORED AS metrics support - test_utils: make_batch, TestSplitProvider, MetricsTestbed for integration tests Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Routes Substrait ReadRel nodes to registered QuickwitDataSource implementations. Standard NamedTable reads resolve via MetricsDataSource::try_consume_read_rel. ExtensionTable reads (custom protos) can be handled by downstream callers. - QuickwitSubstraitConsumer implementing datafusion-substrait SubstraitConsumer - execute_substrait_plan / execute_substrait_plan_streaming entry points - DataFusionService::execute_substrait (bytes) and execute_substrait_json (dev path) - session.rs: DataFusionSessionBuilder::execute_substrait convenience method Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…tegration tests - Add datafusion.proto with DataFusionService (ExecuteSubstrait, ExecuteSql RPCs) - Generate codegen and mod.rs for the new proto service - Wire DataFusionService and WorkerService into quickwit-serve gRPC layer - Add DataFusionServiceGrpcImpl handler - Auto-create otel-metrics-v0_9 index on startup alongside logs/traces - Add metrics_datafusion_tests: in-process SQL + Substrait over parquet splits - Add metrics_distributed_tests: multi-node distributed execution - Add rollup_substrait.json fixture for Substrait plan testing Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Remove extra schema argument from ParquetWriter::new; the API only accepts a ParquetWriterConfig. Remove unused ParquetSchema import. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…test - Remove erroneous ParquetSchema argument from ParquetWriter::new calls in integration tests (API takes only ParquetWriterConfig) - Mark test_rest_ingest_then_in_process_query as #[ignore] until the /ingest-metrics REST endpoint is wired in quickwit-serve Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
f57d0b1 to
07f8548
Compare
mattmkim
left a comment
There was a problem hiding this comment.
disclaimer only really reviewed the metrics specific logic, not a DF expert. made comments here,
what would give us more confidence in merging this? should we feature flag the datafusion code/path, so you have to "opt-in" to compiling an image with it? we could also add an env var to actually enable the endpoint?
| // gRPC handler. No downstream-specific code needed here. | ||
| let datafusion_session_builder = if node_config | ||
| .is_service_enabled(QuickwitService::Searcher) | ||
| && quickwit_common::get_bool_from_env("QW_ENABLE_DATAFUSION_ENDPOINT", false) |
There was a problem hiding this comment.
@mattmkim the session is behind a env var already FYI. Open to other rec's for isolating.
| @@ -0,0 +1,252 @@ | |||
| // Copyright 2021-Present Datadog, Inc. | |||
There was a problem hiding this comment.
@guilload would be good to have a quickwit expert review this part
c850574 to
7245f1e
Compare
- fix: CAST unwrapping in classify_filter — reuse predicate::column_name so time-range predicates are correctly classified as Inexact and passed to scan(); previously CAST-wrapped filters were silently dropped - fix: declare parquet sort order (metric_name, timestamp_secs ASC) on FileScanConfig so DataFusion avoids redundant sort operators - fix: get_opts now respects GetOptions.range — dispatches to get_slice for Bounded/Suffix ranges instead of always downloading the full file - fix: to_object_store_error propagates file path on NotFound - fix: register_for_worker made a no-op; lazy scan-path registration is sufficient and avoids O(indexes) metastore RPCs per worker task; removes stale comment claiming a non-existent object-store cache - fix: extract is_index_not_found helper, removing duplicated downcast block from try_consume_read_rel and create_default_table_provider - fix: sort before dedup in QuickwitSchemaProvider::table_names - fix: empty searcher pool returns Ok(vec![]) for local execution fallback - fix: remove dead builder methods with_udf_batch, with_codec_applier, with_physical_optimizer_rule from DataSourceContributions - feat: add tracing spans to execute_substrait and execute_sql - feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve - refactor: extract stream_to_receiver helper in gRPC handler Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
7245f1e to
1a5a11b
Compare
|
Hi @alexanderbianchi, Please can you give more context on this effort. Are you trying to query parquet files in DataFusion via Quickwit? How is this supposed to work for a user? How Quickwit end up with the parquet files being queried. |
e97acde to
e61b936
Compare
Adds a DataFusion-based query execution layer for OSS parquet metrics. Review commit by commit. Everything is disabled by default.
Commits
f8de0eca—quickwit-datafusioncore frameworkQuickwitDataSourcetrait: plugin point for data sources (contributions,create_default_table_provider,list_index_names, optionalinit/register_for_worker/try_consume_read_relhooks)DataSourceContributions: additive value type for optimizer rules, UDFs, codec appliers;check_invariantsdetects name collisions at startupQuickwitSchemaProvider: routes table resolution to DDL-registered tables (viaMemorySchemaProvider) then sourcecreate_default_table_providerDataFusionSessionBuilder: sharedArc<RuntimeEnv>across sessions; installsDistributedPhysicalOptimizerRule+QuickwitTaskEstimatorwhen a resolver is presentbuild_quickwit_worker/QuickwitWorkerSessionBuilder: shares coordinatorRuntimeEnvso registered object stores are visible on workers without re-registrationQuickwitWorkerResolver: mapsSearcherPooladdresses to worker gRPC URLsQuickwitObjectStore: read-onlyquickwit_storage::Storage→object_store::ObjectStorebridgeDataFusionService: streaming SQL execution; handles semicolon-separated DDL+query batchesbce31c87—MetricsDataSourceMetricsDataSourceimplementsQuickwitDataSource; delegates toMetricsIndexResolver, registers object stores on workers concurrentlyMetricsTableProvider:ParquetSource-backed scan with bloom filter, page-index, and filter pushdown enabled; one file group per splitpredicate.rs): extractsMetricsSplitQueryfrom DataFusion filters; unwrapsCAST/TryCastnodes so time-range predicates survive DataFusion's type-coercion passMetastoreSplitProvider: multi-valueINlists on tag columns are not forwarded to the metastore (singleOption<String>per field); parquet-level filter handles correctnessMetastoreIndexResolver:index_metadataRPC +StorageResolver::resolve; no cache (follow-up)MetricsTableProviderFactory:STORED AS metricsDDL support8359a4a4—QuickwitSubstraitConsumerSubstraitConsumerfromdatafusion-substrait; routes eachReadRelto registered sources viatry_consume_read_rel, falls back to catalogWithCustomProvidershim: injects a pre-built provider for one table without rebuilding the sessionExtensionTablerels toNamedTableafter a source claims them sofrom_read_relapplies standard filter/projection handlingexecute_substrait_plan_streamingtakes fullSessionContext(notSessionState) so catalog registrations frombuild_sessionremain visibleexecute_substraitandexecute_substrait_jsontoDataFusionServicefeaceece7— gRPC wiring + integration testsdatafusion.proto:DataFusionServicewithExecuteSubstraitandExecuteSqlserver-streaming RPCs; each response carries oneRecordBatchas Arrow IPC bytesDataFusionServiceGrpcImpl: tonic adapter streaming viampsc+ReceiverStream;InvalidArgumentfor plan/schema errors,Internalotherwisequickwit-servewhendatafusion_session_builderisSome; gRPC reflection registered only when enabledGROUP BY, distributed tasks (PartitionIsolatorExec, no shuffles), NULL fill for missing parquet columns, Substrait named-table queries, rollup from JSON plan🤖 Generated with Claude Code