Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.rules.analysis.NormalizeRepeat;
Expand All @@ -37,6 +40,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.expressions.functions.Function;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
Expand All @@ -60,6 +64,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -321,6 +326,51 @@ protected Expression tryRewriteExpression(StructInfo queryStructInfo, Expression
return rewrittenExpression;
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
@Override
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
// Check query plan is contain the partition column
// Query plan in the current rule must contain aggregate node, because the rule pattern is
//
Optional<LogicalAggregate<Plan>> logicalAggregateOptional =
queryPlan.collectFirst(planTreeNode -> planTreeNode instanceof LogicalAggregate);
if (!logicalAggregateOptional.isPresent()) {
return true;
}
List<Expression> groupByExpressions = logicalAggregateOptional.get().getGroupByExpressions();
if (groupByExpressions.isEmpty()) {
// Scalar aggregate can not compensate union all
return false;
}
final String relatedCol = mtmv.getMvPartitionInfo().getRelatedCol();
final BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
boolean canUnionRewrite = false;
// Check the query plan group by expression contains partition col or not
List<? extends Expression> groupByShuttledExpressions =
ExpressionUtils.shuttleExpressionWithLineage(groupByExpressions, queryPlan, new BitSet());
for (Expression expression : groupByShuttledExpressions) {
canUnionRewrite = !expression.collectToSet(expr -> expr instanceof SlotReference
&& ((SlotReference) expr).isColumnFromTable()
&& Objects.equals(((SlotReference) expr).getColumn().map(Column::getName).orElse(null),
relatedCol)
&& Objects.equals(((SlotReference) expr).getTable().map(BaseTableInfo::new).orElse(null),
relatedTableInfo)).isEmpty();
if (canUnionRewrite) {
break;
}
}
return canUnionRewrite;
}

/**
* Check query and view aggregate compatibility
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
return rewriteResults;
}
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
boolean canUnionRewrite = canUnionRewrite(queryPlan,
((AsyncMaterializationContext) materializationContext).getMtmv(),
cascadesContext);
if (partitionNeedUnion && !canUnionRewrite) {
materializationContext.recordFailReason(queryStructInfo,
"need compensate union all, but can not, because the query structInfo",
() -> String.format("mv partition info is %s, and the query plan is %s",
((AsyncMaterializationContext) materializationContext).getMtmv()
.getMvPartitionInfo(), queryPlan.treeString()));
return rewriteResults;
}
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion) {
Expand Down Expand Up @@ -372,6 +383,20 @@ protected boolean needUnionRewrite(
&& (!invalidPartitions.key().isEmpty() || !invalidPartitions.value().isEmpty());
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
return true;
}

// Normalize expression such as nullable property and output slot id
protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
if (rewrittenPlan.getOutput().size() != originPlan.getOutput().size()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1_0_before --
28

-- !query1_0_after --
28

-- !query1_1_before --
32

-- !query1_1_after --
32

-- !query2_0_before --
a 4
b 28

-- !query2_0_after --
a 2
b 26

-- !query3_0_before --
a 4
b 28

-- !query3_0_after --
a 4
b 28

-- !query4_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query4_0_after --
2024-09-12 4
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_after --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query6_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query6_0_after --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_after --
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,13 @@ class Suite implements GroovyInterceptable {
}
}

def mv_rewrite_fail = { query_sql, mv_name ->
explain {
sql("${query_sql}")
notContains("${mv_name}(${mv_name})")
}
}

def check_mv_rewrite_fail = { db, mv_sql, query_sql, mv_name ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
Expand Down
Loading