Skip to content

Support Aggregating by RunArrays #16011

@brancz

Description

@brancz

Is your feature request related to a problem or challenge?

It's currently not possible to aggregate by RunArrays.

Example code grouping by a `RunArray`
use arrow::array::{Array, Int32Array, RunArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, Int32Type};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), datafusion::error::DataFusionError> {
    // Create a new DataFusion context
    let ctx = SessionContext::new();

    // First, let's create our data
    // We'll have temperature readings where multiple consecutive readings come from the same sensor

    // Temperature values (not run-length encoded)
    // This represents all temperature readings in sequence
    let temperatures = Int32Array::from(vec![
        22, 23, 24, 25, 22, 21, 20, 21, 22, 23, 24, 25, 26, 27, 28
    ]);

    // Create the string values for sensor IDs
    let sensor_id_values = StringViewArray::from(vec!["sensor_A", "sensor_B", "sensor_C", "sensor_D"]);

    // Create the run ends array (positions where each run ends)
    let sensor_id_run_ends = Int32Array::from(vec![4, 7, 12, 15]);

    // Create RunArray for sensor IDs with Int32Type as run end type
    let sensor_id_ree = RunArray::<Int32Type>::try_new(&sensor_id_run_ends, &sensor_id_values)
        .expect("Failed to create sensor ID RunArray");

    // Get the exact data type of the RunArray for the schema
    let sensor_id_type = sensor_id_ree.data_type().clone();

    // Create schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("sensor_id", sensor_id_type, false),
        Field::new("temperature", DataType::Int32, false),
    ]));

    // Create record batch
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![Arc::new(sensor_id_ree), Arc::new(temperatures)],
    )?;

    // Register as a table
    let provider = MemTable::try_new(schema, vec![vec![batch]])?;
    ctx.register_table("sensor_readings", Arc::new(provider))?;

    // Run aggregation query
    // Group by sensor ID and calculate statistics
    let sql = "
        SELECT
            sensor_id,
            AVG(temperature) AS avg_temp,
            MIN(temperature) AS min_temp,
            MAX(temperature) AS max_temp,
            COUNT(temperature) AS reading_count
        FROM sensor_readings
        GROUP BY sensor_id
        ORDER BY sensor_id
    ";

    let results = ctx.sql(sql).await?.collect().await?;
    for batch in results {
        println!("{:?}", batch);
    }

    Ok(())
}

Describe the solution you'd like

To make it happen there are a variety of things that need to happen:

Describe alternatives you've considered

We're currently expanding REE arrays before pushing them through DataFusion query plans, but being able to do it with zero-copy would be much better for performance.

Additional context

I've already got all pieces implemented, but I'm opening this for context and more easy tracking.

@alamb

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions