Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ async fn explain_logical_plan_only() {
"logical_plan",
"Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
\n SubqueryAlias: t\
\n Projection: column1\
\n Projection: column2\
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))"
]];
assert_eq!(expected, actual);
Expand Down
201 changes: 174 additions & 27 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::merge_projection::merge_projection;
use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
use crate::{OptimizerConfig, OptimizerRule};
use arrow::datatypes::DataType;
use arrow::error::Result as ArrowResult;
use datafusion_common::ScalarValue::UInt8;
use datafusion_common::{
Expand Down Expand Up @@ -148,8 +149,10 @@ impl OptimizerRule for PushDownProjection {
{
let mut used_columns: HashSet<Column> = HashSet::new();
if projection_is_empty {
used_columns
.insert(scan.projected_schema.fields()[0].qualified_column());
let field = find_small_field(scan.projected_schema.fields()).ok_or(
DataFusionError::Internal("Scan with empty schema".to_string()),
)?;
used_columns.insert(field.qualified_column());
push_down_scan(&used_columns, scan, true)?
} else {
for expr in projection.expr.iter() {
Expand All @@ -161,10 +164,13 @@ impl OptimizerRule for PushDownProjection {
}
}
LogicalPlan::Values(values) if projection_is_empty => {
let first_col =
Expr::Column(values.schema.fields()[0].qualified_column());
let field = find_small_field(values.schema.fields()).ok_or(
DataFusionError::Internal("Values with empty schema".to_string()),
)?;
let column = Expr::Column(field.qualified_column());

LogicalPlan::Projection(Projection::try_new(
vec![first_col],
vec![column],
Arc::new(child_plan.clone()),
)?)
}
Expand Down Expand Up @@ -423,7 +429,88 @@ pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr>
.collect::<HashMap<_, _>>()
}

// Get the projection exprs from columns in the order of the schema
/// Accumulate the memory size of a data type measured in bits.
///
/// Types with a variable size get assigned with a fixed size which is greater than most
/// primitive types.
///
/// While traversing nested types, `nesting` is incremented on every level.
fn nested_size(data_type: &DataType, nesting: &mut usize) -> usize {
use DataType::*;
if data_type.is_primitive() {
return data_type.primitive_width().unwrap_or(1) * 8;
}

if data_type.is_nested() {
*nesting += 1;
}

match data_type {
Null => 0,
Boolean => 1,
Binary | Utf8 => 128,
LargeBinary | LargeUtf8 => 256,
FixedSizeBinary(bytes) => (*bytes * 8) as usize,
// primitive types
Int8
| Int16
| Int32
| Int64
| UInt8
| UInt16
| UInt32
| UInt64
| Float16
| Float32
| Float64
| Timestamp(_, _)
| Date32
| Date64
| Time32(_)
| Time64(_)
| Duration(_)
| Interval(_)
| Dictionary(_, _)
| Decimal128(_, _)
| Decimal256(_, _) => data_type.primitive_width().unwrap_or(1) * 8,
// nested types
List(f) => nested_size(f.data_type(), nesting),
FixedSizeList(_, s) => (s * 8) as usize,
LargeList(f) => nested_size(f.data_type(), nesting),
Struct(fields) => fields
.iter()
.map(|f| nested_size(f.data_type(), nesting))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle we could project a sub-field from a struct instead of the entire struct (all columns).

Copy link
Copy Markdown
Contributor Author

@ch-sc ch-sc Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I will play around with it. Though it sounds like a rare edge case to me where no other "smaller" type would be present in the schema!?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah indeed :)

.sum(),
Union(fields, _) => fields
.iter()
.map(|(_, f)| nested_size(f.data_type(), nesting))
.sum(),
Map(field, _) => nested_size(field.data_type(), nesting),
RunEndEncoded(run_ends, values) => {
nested_size(run_ends.data_type(), nesting)
+ nested_size(values.data_type(), nesting)
}
}
}

/// Find a field with a presumable small memory footprint based on its data type's memory size
/// and the level of nesting.
fn find_small_field(fields: &[DFField]) -> Option<DFField> {
fields
.iter()
.map(|f| {
let nesting = &mut 0;
let size = nested_size(f.data_type(), nesting);
(*nesting, size)
})
.enumerate()
.min_by(|(_, (nesting_a, size_a)), (_, (nesting_b, size_b))| {
nesting_a.cmp(nesting_b).then(size_a.cmp(size_b))
})
.map(|(i, _)| fields[i].clone())
}

/// Get the projection exprs from columns in the order of the schema
fn get_expr(columns: &HashSet<Column>, schema: &DFSchemaRef) -> Result<Vec<Expr>> {
let expr = schema
.fields()
Expand Down Expand Up @@ -489,23 +576,14 @@ fn push_down_scan(
.filter_map(ArrowResult::ok)
.collect();

if projection.is_empty() {
if has_projection && !schema.fields().is_empty() {
// Ensure that we are reading at least one column from the table in case the query
// does not reference any columns directly such as "SELECT COUNT(1) FROM table",
// except when the table is empty (no column)
projection.insert(0);
} else {
// for table scan without projection, we default to return all columns
projection = scan
.source
.schema()
.fields()
.iter()
.enumerate()
.map(|(i, _)| i)
.collect::<BTreeSet<usize>>();
}
if !has_projection && projection.is_empty() {
// for table scan without projection, we default to return all columns
projection = schema
.fields()
.iter()
.enumerate()
.map(|(i, _)| i)
.collect::<BTreeSet<usize>>();
}

// Building new projection from BTreeSet
Expand Down Expand Up @@ -562,7 +640,7 @@ mod tests {
use crate::optimizer::Optimizer;
use crate::test::*;
use crate::OptimizerContext;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion_common::DFSchema;
use datafusion_expr::builder::table_scan_with_filters;
use datafusion_expr::expr;
Expand Down Expand Up @@ -922,7 +1000,7 @@ mod tests {
.project(vec![lit(1_i64), lit(2_i64)])?
.build()?;
let expected = "Projection: Int64(1), Int64(2)\
\n TableScan: test projection=[a]";
\n TableScan: test projection=[]";
assert_optimized_plan_eq(&plan, expected)
}

Expand Down Expand Up @@ -969,7 +1047,7 @@ mod tests {

let expected = "\
Projection: Int32(1) AS a\
\n TableScan: test projection=[a]";
\n TableScan: test projection=[]";

assert_optimized_plan_eq(&plan, expected)
}
Expand Down Expand Up @@ -998,7 +1076,7 @@ mod tests {

let expected = "\
Projection: Int32(1) AS a\
\n TableScan: test projection=[a], full_filters=[b = Int32(1)]";
\n TableScan: test projection=[], full_filters=[b = Int32(1)]";

assert_optimized_plan_eq(&plan, expected)
}
Expand Down Expand Up @@ -1154,4 +1232,73 @@ mod tests {
.unwrap_or(optimized_plan);
Ok(optimized_plan)
}

#[test]
fn test_nested_size() {
use DataType::*;
let nesting = &mut 0;
assert_eq!(nested_size(&Null, nesting), 0);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&Boolean, nesting), 1);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&UInt8, nesting), 8);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&Int64, nesting), 64);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&Decimal256(5, 2), nesting), 256);
assert_eq!(*nesting, 0);
assert_eq!(
nested_size(&List(Arc::new(Field::new("A", Int64, true))), nesting),
64
);
assert_eq!(*nesting, 1);
*nesting = 0;
assert_eq!(
nested_size(
&List(Arc::new(Field::new(
"A",
List(Arc::new(Field::new("AA", Int64, true))),
true
))),
nesting
),
64
);
assert_eq!(*nesting, 2);
}

#[test]
fn test_find_small_field() {
use DataType::*;
let int32 = DFField::from(Field::new("a", Int32, false));
let bin = DFField::from(Field::new("b", Binary, false));
let list_i64 = DFField::from(Field::new(
"c",
List(Arc::new(Field::new("c_1", Int64, true))),
false,
));
let time_s = DFField::from(Field::new("d", Time32(TimeUnit::Second), false));

assert_eq!(
find_small_field(&[
int32.clone(),
bin.clone(),
list_i64.clone(),
time_s.clone()
]),
Some(int32.clone())
);
assert_eq!(
find_small_field(&[bin.clone(), list_i64.clone(), time_s.clone()]),
Some(time_s.clone())
);
assert_eq!(
find_small_field(&[time_s.clone(), int32.clone()]),
Some(time_s.clone())
);
assert_eq!(
find_small_field(&[bin.clone(), list_i64.clone()]),
Some(bin.clone())
);
}
}
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ EXPLAIN SELECT count(*) from alltypes_plain
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
--TableScan: alltypes_plain projection=[id]
--TableScan: alltypes_plain projection=[bool_col]
Comment thread
ch-sc marked this conversation as resolved.
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[id]
--------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[bool_col]
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/json.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ EXPLAIN SELECT count(*) from json_test
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
--TableScan: json_test projection=[a]
--TableScan: json_test projection=[c]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[c]

query error DataFusion error: Schema error: No field named mycol\.
SELECT mycol FROM single_nan
Expand Down