Skip to content

order_by not respected for window functions using udaf #11981

@timsaucer

Description

@timsaucer

Describe the bug

When using an aggregate function as a window function, the order_by is not respected. I've tried a few permutations of setting this value on the WindowFunction and setting it via the expression function builder.

To Reproduce

Note: In the below snippet I tried with and without calling .order_by() and with and without setting order_by in the WindowFunction struct.

When trying to set it on the expression function builder you get an error
Error: ArrowError(InvalidArgumentError("Sort requires at least one column"), None)

use arrow::record_batch::RecordBatch;
use arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::functions_aggregate::first_last::last_value_udaf;
use datafusion::prelude::*;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::{
    WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
};
use datafusion_sql::sqlparser::ast::NullTreatment;
use std::sync::Arc;
use datafusion::common::ScalarValue;

#[tokio::main]
async fn main() -> Result<()> {
    let schema = Schema::new(vec![
        Field::new("a", DataType::Int32, true),
        Field::new("b", DataType::Int32, true),
    ]);

    let batch = RecordBatch::try_new(
        Arc::new(schema.clone()),
        vec![
            Arc::new(Int32Array::from(vec![1, 5, 3, 2, 4])),
            Arc::new(Int32Array::from(vec![
                Some(1),
                None,
                Some(3),
                None,
                Some(4),
            ])),
        ],
    )?;

    let ctx = SessionContext::new();

    let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?;
    ctx.register_table("t", Arc::new(provider))?;

    let df = ctx.table("t").await?;

    df.clone().show().await?;

    let w = WindowFunction::new(
        WindowFunctionDefinition::AggregateUDF(last_value_udaf()),
        vec![col("b")],
    );

    let func = Expr::WindowFunction(w)
        .null_treatment(NullTreatment::IgnoreNulls)
        .order_by(vec![col("a").sort(true, true)])
        .window_frame(WindowFrame::new_bounds(
            WindowFrameUnits::Rows,
            WindowFrameBound::Preceding(ScalarValue::UInt32(None)),
            WindowFrameBound::CurrentRow,
        ))
        .build()?
        .alias("last_val");

    df.select(vec![col("a"), col("b"), func])?.show().await?;

    Ok(())
}

    // Expectation
    // +---+---+----------+
    // | a | b | last_val |
    // +---+---+----------+
    // | 1 | 1 | 1        |
    // | 2 |   | 1        |
    // | 3 | 3 | 3        |
    // | 4 | 4 | 4        |
    // | 5 |   | 4        |
    // +---+---+----------+

    // Actual
    // +---+---+----------+
    // | a | b | last_val |
    // +---+---+----------+
    // | 1 | 1 | 1        |
    // | 5 |   | 1        |
    // | 3 | 3 | 3        |
    // | 2 |   | 3        |
    // | 4 | 4 | 4        |
    // +---+---+----------+

Expected behavior

In the above example, we should be able to set a sort order to get the last_value as a window function.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions