Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.catalog.constraint.TableIdentifier;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
Expand All @@ -36,6 +36,7 @@
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
Expand All @@ -50,11 +51,12 @@

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -95,17 +97,29 @@ public static Optional<RelatedTableInfo> getRelatedTableInfo(String column, Plan
if (!columnSlot.isColumnFromTable()) {
return Optional.empty();
}
// check sql pattern
IncrementCheckerContext context = new IncrementCheckerContext(columnSlot);
materializedViewPlan.accept(MaterializedViewIncrementChecker.INSTANCE, context);
if (context.getPartitionRelatedTableAndColumnList().isEmpty() || !context.isPctPossible()) {
// Collect table relation map which is used to identify self join
List<CatalogRelation> catalogRelationObjs =
materializedViewPlan.collectToList(CatalogRelation.class::isInstance);
ImmutableMultimap.Builder<TableIdentifier, CatalogRelation> tableCatalogRelationMultimapBuilder =
ImmutableMultimap.builder();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use expectedSize builder

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableMultimap.Builder seems doesn't have expectedSize builder

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableMap.builderWithExpectedSize()

for (CatalogRelation catalogRelation : catalogRelationObjs) {
tableCatalogRelationMultimapBuilder.put(new TableIdentifier(catalogRelation.getTable()), catalogRelation);
}
// Check sql pattern
IncrementCheckerContext checkContext =
new IncrementCheckerContext(columnSlot, tableCatalogRelationMultimapBuilder.build());
materializedViewPlan.accept(MaterializedViewIncrementChecker.INSTANCE, checkContext);
Multimap<TableIf, Column> partitionRelatedTableAndColumnMap =
checkContext.getPartitionRelatedTableAndColumnMap();
if (partitionRelatedTableAndColumnMap.isEmpty()) {
return Optional.empty();
}
// TODO support to return only one related table info, support multi later
Pair<TableIf, Column> tableIfColumnPair = context.getPartitionRelatedTableAndColumnList().get(0);
return Optional.of(new RelatedTableInfo(new BaseTableInfo(tableIfColumnPair.key()),
context.isPctPossible(),
tableIfColumnPair.value().getName()));
for (Map.Entry<TableIf, Column> entry : partitionRelatedTableAndColumnMap.entries()) {
return Optional.of(new RelatedTableInfo(new BaseTableInfo(entry.getKey()), true,
entry.getValue().getName()));
}
return Optional.empty();
}

/**
Expand Down Expand Up @@ -289,7 +303,6 @@ public Void visitLogicalFilter(LogicalFilter<? extends Plan> filter, IncrementCh
public Void visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join,
IncrementCheckerContext context) {
if (join.isMarkJoin()) {
context.setPctPossible(false);
return null;
}
Plan left = join.child(0);
Expand All @@ -301,20 +314,17 @@ public Void visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join,
boolean useLeft = leftColumnSet.contains(context.getMvPartitionColumn().getColumn().get());
JoinType joinType = join.getJoinType();
if (joinType.isInnerJoin() || joinType.isCrossJoin()) {
context.setPctPossible(true);
} else if (joinType.isLeftJoin()
return visit(join, context);
} else if ((joinType.isLeftJoin()
|| joinType.isLefSemiJoin()
|| joinType.isLeftAntiJoin()) {
context.setPctPossible(useLeft);
} else if (joinType.isRightJoin()
|| joinType.isLeftAntiJoin()) && useLeft) {
return visit(join.left(), context);
} else if ((joinType.isRightJoin()
|| joinType.isRightAntiJoin()
|| joinType.isRightSemiJoin()) {
context.setPctPossible(!useLeft);
} else {
// un supported join type
context.setPctPossible(false);
|| joinType.isRightSemiJoin()) && !useLeft) {
return visit(join.right(), context);
}
return visit(join, context);
return null;
}

@Override
Expand All @@ -324,15 +334,13 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte
}
LogicalCatalogRelation logicalCatalogRelation = (LogicalCatalogRelation) relation;
TableIf table = logicalCatalogRelation.getTable();
// if self join, can't infer partition column
if (!context.getTableIdAndRelationMapping().get(table.getId()).isEmpty()) {
context.setPctPossible(false);
// if self join, self join can not partition track now, remove the partition column correspondingly
if (context.getRelationByTable(table).size() > 1) {
context.getPartitionRelatedTableAndColumnMap().removeAll(table);
return null;
}
// record tableId and relation, to check the self join
context.addTableIdAndRelation(((LogicalCatalogRelation) relation).getTable().getId(), relation);
// TODO: 2024/1/31 support only one partition referenced column, support multi later
if (!context.getPartitionRelatedTableAndColumnList().isEmpty()) {
if (!context.getPartitionRelatedTableAndColumnMap().isEmpty()) {
return null;
}
if (!(table instanceof MTMVRelatedTableIf)) {
Expand All @@ -345,9 +353,9 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte
}
Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns());
Column mvReferenceColumn = context.getMvPartitionColumn().getColumn().get();
if (partitionColumnSet.contains(mvReferenceColumn)) {
if (partitionColumnSet.contains(mvReferenceColumn)
&& (!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull())) {
context.addTableColumn(table, mvReferenceColumn);
context.setPctPossible(!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull());
}
return visit(relation, context);
}
Expand All @@ -357,7 +365,6 @@ public Void visitLogicalAggregate(LogicalAggregate<? extends Plan> aggregate,
IncrementCheckerContext context) {
Set<Expression> groupByExprSet = new HashSet<>(aggregate.getGroupByExpressions());
if (groupByExprSet.isEmpty()) {
context.setPctPossible(false);
return null;
}
Set<Column> originalGroupbyExprSet = new HashSet<>();
Expand All @@ -367,7 +374,6 @@ public Void visitLogicalAggregate(LogicalAggregate<? extends Plan> aggregate,
}
});
if (!originalGroupbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
context.setPctPossible(false);
return null;
}
return visit(aggregate, context);
Expand All @@ -379,15 +385,16 @@ public Void visitLogicalWindow(LogicalWindow<? extends Plan> window, IncrementCh
if (windowExpressions.isEmpty()) {
return visit(window, context);
}
windowExpressions.forEach(expr -> checkWindowPartition(expr, context));
for (NamedExpression namedExpression : windowExpressions) {
if (!checkWindowPartition(namedExpression, context)) {
return null;
}
}
return super.visitLogicalWindow(window, context);
}

@Override
public Void visit(Plan plan, IncrementCheckerContext context) {
if (!context.isPctPossible()) {
return null;
}
if (plan instanceof LogicalProject
|| plan instanceof LogicalFilter
|| plan instanceof LogicalJoin
Expand All @@ -397,65 +404,58 @@ public Void visit(Plan plan, IncrementCheckerContext context) {
|| plan instanceof LogicalWindow) {
return super.visit(plan, context);
}
context.setPctPossible(false);
return null;
}

private void checkWindowPartition(Expression expression, IncrementCheckerContext context) {
expression.collectToList(expressionTreeNode -> expressionTreeNode instanceof WindowExpression)
.forEach(windowObj -> {
WindowExpression windowExpression = (WindowExpression) windowObj;
List<Expression> partitionKeys = windowExpression.getPartitionKeys();
Set<Column> originalPartitionbyExprSet = new HashSet<>();
partitionKeys.forEach(groupExpr -> {
if (groupExpr instanceof SlotReference && groupExpr.isColumnFromTable()) {
originalPartitionbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
}
});
if (!originalPartitionbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
context.setPctPossible(false);
}
});
private boolean checkWindowPartition(Expression expression, IncrementCheckerContext context) {
List<Object> windowExpressions =
expression.collectToList(expressionTreeNode -> expressionTreeNode instanceof WindowExpression);
for (Object windowExpressionObj : windowExpressions) {
WindowExpression windowExpression = (WindowExpression) windowExpressionObj;
List<Expression> partitionKeys = windowExpression.getPartitionKeys();
Set<Column> originalPartitionbyExprSet = new HashSet<>();
partitionKeys.forEach(groupExpr -> {
if (groupExpr instanceof SlotReference && groupExpr.isColumnFromTable()) {
originalPartitionbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
}
});
if (!originalPartitionbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
return false;
}
}
return true;
}
}

private static final class IncrementCheckerContext {
private final SlotReference mvPartitionColumn;
private boolean pctPossible = true;
private final List<Pair<TableIf, Column>> partitionRelatedTableAndColumnList = new ArrayList<>();
// This record the table id and relation mapping, because a table maybe used repeatedly.
private final Multimap<Long, LogicalRelation> tableIdAndRelationMapping = HashMultimap.create();
private final Multimap<TableIdentifier, CatalogRelation> tableAndCatalogRelationMap;
private final Multimap<TableIf, Column> partitionRelatedTableAndColumnMap = HashMultimap.create();

public IncrementCheckerContext(SlotReference mvPartitionColumn) {
public IncrementCheckerContext(SlotReference mvPartitionColumn,
Multimap<TableIdentifier, CatalogRelation> tableAndCatalogRelationMap) {
this.mvPartitionColumn = mvPartitionColumn;
this.tableAndCatalogRelationMap = tableAndCatalogRelationMap;
}

public SlotReference getMvPartitionColumn() {
return mvPartitionColumn;
}

public boolean isPctPossible() {
return pctPossible;
}

public void setPctPossible(boolean pctPossible) {
this.pctPossible = pctPossible;
}

public void addTableColumn(TableIf relatedTable, Column partitionColumn) {
partitionRelatedTableAndColumnList.add(Pair.of(relatedTable, partitionColumn));
partitionRelatedTableAndColumnMap.put(relatedTable, partitionColumn);
}

public List<Pair<TableIf, Column>> getPartitionRelatedTableAndColumnList() {
return partitionRelatedTableAndColumnList;
public Multimap<TableIf, Column> getPartitionRelatedTableAndColumnMap() {
return partitionRelatedTableAndColumnMap;
}

public Multimap<Long, LogicalRelation> getTableIdAndRelationMapping() {
return tableIdAndRelationMapping;
public Collection<CatalogRelation> getRelationByTable(TableIf tableIf) {
return tableAndCatalogRelationMap.get(new TableIdentifier(tableIf));
}

public void addTableIdAndRelation(Long tableId, LogicalRelation relation) {
tableIdAndRelationMapping.put(tableId, relation);
public void addTableAndRelation(TableIf tableIf, CatalogRelation relation) {
tableAndCatalogRelationMap.put(new TableIdentifier(tableIf), relation);
}
}

Expand Down
Loading