Skip to content

Datafusion not using all cores on a TPCH like query during query with repartiton #6290

@alamb

Description

@alamb

Describe the bug

DataFusion is not taking advantage of all cores on my machine despite the plan having a repartition

I ran this at 2e9beeb on main while working on #6278

To Reproduce

-- step 1: save this as script.sql in arrow-datafusion checkout
--
-- step 2: generate data:
--  (cd benchmarks && ./bench.sh data all)
--
-- step 3: run this script:
-- datafusion-cli -f script.sql
--
-- Expected: all cores are kept busy processing the query
-- Actual: only one core seems to be busy

-- load the data from lineitem a few times
create table lineitem as select * from 'benchmarks/data/lineitem';

insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;


-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | plan_type     | plan                                                                                                                                                                                                                                                                                            |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | logical_plan  | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST                                                                                                                                                                                                                |
-- |               |   Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order                                          |
-- |               |     Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]]                                                                                  |
-- |               |       Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_returnflag, lineitem.l_linestatus                                                                                                                                                              |
-- |               |         Filter: lineitem.l_shipdate <= Date32("10471")                                                                                                                                                                                                                                          |
-- |               |           TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_returnflag, l_linestatus, l_shipdate]                                                                                                                                                                      |
-- | physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                          |
-- |               |   SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                  |
-- |               |     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, AVG(lineitem.l_quantity)@3 as avg_qty, AVG(lineitem.l_extendedprice)@4 as avg_price, AVG(lineitem.l_discount)@5 as avg_disc, COUNT(UInt8(1))@6 as count_order] |
-- |               |       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                           |
-- |               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                             |
-- |               |           RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16), input_partitions=16                                                                                                                                   |
-- |               |             AggregateExec: mode=Partial, gby=[l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                              |
-- |               |               ProjectionExec: expr=[l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus]                                                                                               |
-- |               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                     |
-- |               |                   FilterExec: l_shipdate@5 <= 10471                                                                                                                                                                                                                                             |
-- |               |                     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                                                                                                                       |
-- |               |                       MemoryExec: partitions=1, partition_sizes=[11728]                                                                                                                                                                                                                         |
-- |               |                                                                                                                                                                                                                                                                                                 |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

EXPLAIN select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1998-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

-- Run the actual query:
--
-- 0 rows in set. Query took 1.570 seconds.
-- 0 rows in set. Query took 0.004 seconds.
-- 0 rows in set. Query took 0.005 seconds.
-- 0 rows in set. Query took 0.009 seconds.
-- 0 rows in set. Query took 0.023 seconds.
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | l_returnflag | l_linestatus | sum_qty       | avg_qty   | avg_price    | avg_disc | count_order |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | A            | F            | 603745712.00  | 25.522005 | 38273.129734 | 0.049985 | 23655888    |
-- | N            | F            | 15862672.00   | 25.516471 | 38284.467760 | 0.050093 | 621664      |
-- | N            | O            | 1191616640.00 | 25.502226 | 38249.117988 | 0.049996 | 46725984    |
-- | R            | F            | 603516048.00  | 25.505793 | 38250.854626 | 0.050009 | 23661920    |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- 4 rows in set. Query took 9.980 seconds.

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1998-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

Expected behavior

I expect during the query that all the cores on my machine to be in use, but instead only a single core is used.

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingperformanceMake DataFusion faster

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions