Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
Expand All @@ -39,6 +40,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
Expand Down Expand Up @@ -201,12 +203,53 @@ public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void c
return true;
}

private boolean isBucketShuffleDownGrade(DistributionSpecHash srcSideSpec) {
boolean isBucketShuffleDownGrade = ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
if (!isBucketShuffleDownGrade) {
private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) {
// improper to do bucket shuffle join:
// oneSide:
// - base table and tablets' number is small enough (< paraInstanceNum)
// otherSide:
// - ShuffleType.EXECUTION_BUCKETED
boolean isEnableBucketShuffleJoin = ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin();
if (!isEnableBucketShuffleJoin) {
return true;
} else if (otherSideSpec.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
|| !(oneSidePlan instanceof GroupPlan)) {
return false;
} else {
return srcSideSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED;
PhysicalOlapScan candidate = findDownGradeBucketShuffleCandidate((GroupPlan) oneSidePlan);
if (candidate == null || candidate.getTable() == null
|| candidate.getTable().getDefaultDistributionInfo() == null) {
return false;
} else {
int prunedPartNum = candidate.getSelectedPartitionIds().size();
int bucketNum = candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int totalBucketNum = prunedPartNum * bucketNum;
int backEndNum = Math.max(1, ConnectContext.get().getEnv().getClusterInfo()
.getBackendsNumber(true));
int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
int totalParaNum = Math.min(10, backEndNum * paraNum);
return totalBucketNum < totalParaNum;
}
}
}

private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan groupPlan) {
if (groupPlan == null || groupPlan.getGroup() == null
|| groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
return null;
} else {
Plan targetPlan = groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
while (targetPlan != null
&& (targetPlan instanceof PhysicalProject || targetPlan instanceof PhysicalFilter)
&& !((GroupPlan) targetPlan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
targetPlan = ((GroupPlan) targetPlan.child(0)).getGroup()
.getPhysicalExpressions().get(0).getPlan();
}
if (targetPlan == null || !(targetPlan instanceof PhysicalOlapScan)) {
return null;
} else {
return (PhysicalOlapScan) targetPlan;
}
}
}

Expand Down Expand Up @@ -243,6 +286,9 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
throw new RuntimeException("should not come here, two children of shuffle join should all be shuffle");
}

Plan leftChild = hashJoin.child(0);
Plan rightChild = hashJoin.child(1);

DistributionSpecHash leftHashSpec = (DistributionSpecHash) leftDistributionSpec;
DistributionSpecHash rightHashSpec = (DistributionSpecHash) rightDistributionSpec;

Expand All @@ -263,7 +309,7 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(rightHashSpec)) {
} else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
Expand All @@ -272,7 +318,7 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(leftHashSpec)) {
} else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_AGG_STATE = "enable_agg_state";

public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = "enable_bucket_shuffle_downgrade";

public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";

public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt";
Expand Down Expand Up @@ -850,9 +848,6 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableBucketShuffleJoin = true;

@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_DOWNGRADE, needForward = true)
public boolean enableBucketShuffleDownGrade = false;

/**
* explode function row count enlarge factor.
*/
Expand Down Expand Up @@ -2552,10 +2547,6 @@ public boolean isEnableBucketShuffleJoin() {
return enableBucketShuffleJoin;
}

public boolean isEnableBucketShuffleDownGrade() {
return enableBucketShuffleDownGrade;
}

public boolean isEnableOdbcTransaction() {
return enableOdbcTransaction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ PhysicalResultSink
------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=(( not (substring(ca_zip, 1, 5) = substring(s_zip, 1, 5)))) build RFs:RF4 s_store_sk->[ss_store_sk]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
------------------------PhysicalProject
--------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF2 ss_customer_sk->[c_customer_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PhysicalResultSink
----------------hashJoin[INNER_JOIN] hashCondition=((i1.i_item_sk = asceding.item_sk)) otherCondition=() build RFs:RF1 item_sk->[i_item_sk]
------------------PhysicalProject
--------------------PhysicalOlapScan[item] apply RFs: RF1
------------------PhysicalDistribute[DistributionSpecReplicated]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------filter((rnk < 11))
------------------------PhysicalWindow
Expand Down Expand Up @@ -44,7 +44,7 @@ PhysicalResultSink
----------------hashJoin[INNER_JOIN] hashCondition=((i2.i_item_sk = descending.item_sk)) otherCondition=() build RFs:RF0 item_sk->[i_item_sk]
------------------PhysicalProject
--------------------PhysicalOlapScan[item] apply RFs: RF0
------------------PhysicalDistribute[DistributionSpecReplicated]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------filter((rnk < 11))
------------------------PhysicalWindow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ PhysicalResultSink
----------------------------------------------hashJoin[INNER_JOIN] hashCondition=((my_customers.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
------------------------------------------------PhysicalProject
--------------------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3 RF4 RF5
------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------------------------PhysicalProject
----------------------------------------------------hashAgg[GLOBAL]
------------------------------------------------------PhysicalDistribute[DistributionSpecHash]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ PhysicalResultSink
----------------------hashAgg[LOCAL]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_bill_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF11 ws_bill_addr_sk->[ca_address_sk]
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------filter((customer_address.ca_gmt_offset = -6.00))
----------------------------------PhysicalOlapScan[customer_address] apply RFs: RF11
----------------------------PhysicalProject
------------------------------filter((customer_address.ca_gmt_offset = -6.00))
--------------------------------PhysicalOlapScan[customer_address] apply RFs: RF11
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF10 i_item_sk->[ws_item_sk]
--------------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ PhysicalResultSink
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((a.ca_address_sk = c.c_current_addr_sk)) otherCondition=() build RFs:RF5 c_current_addr_sk->[ca_address_sk]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[customer_address] apply RFs: RF5
----------------------PhysicalProject
------------------------PhysicalOlapScan[customer_address] apply RFs: RF5
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((c.c_customer_sk = s.ss_customer_sk)) otherCondition=() build RFs:RF4 ss_customer_sk->[c_customer_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ PhysicalResultSink
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF10 c_current_addr_sk->[ca_address_sk]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------filter((customer_address.ca_gmt_offset = -7.00))
------------------------PhysicalOlapScan[customer_address] apply RFs: RF10
------------------PhysicalProject
--------------------filter((customer_address.ca_gmt_offset = -7.00))
----------------------PhysicalOlapScan[customer_address] apply RFs: RF10
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF9 ss_customer_sk->[c_customer_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ PhysicalResultSink
------PhysicalTopN[LOCAL_SORT]
--------PhysicalProject
----------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_addr_sk = current_addr.ca_address_sk)) otherCondition=(( not (ca_city = bought_city))) build RFs:RF5 c_current_addr_sk->[ca_address_sk]
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------PhysicalOlapScan[customer_address] apply RFs: RF5
------------PhysicalProject
--------------PhysicalOlapScan[customer_address] apply RFs: RF5
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((dn.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF4 ss_customer_sk->[c_customer_sk]
Expand All @@ -20,9 +19,8 @@ PhysicalResultSink
----------------------hashAgg[LOCAL]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 ss_addr_sk->[ca_address_sk]
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk)) otherCondition=() build RFs:RF2 hd_demo_sk->[ss_hdemo_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ PhysicalResultSink
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF0 c_current_addr_sk->[ca_address_sk]
----------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------PhysicalProject
--------------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF0
----------------------------------------PhysicalProject
------------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF0
----------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------PhysicalProject
--------------------------------------------filter((customer.c_preferred_cust_flag = 'Y'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ PhysicalResultSink
------------------------------PhysicalOlapScan[catalog_returns] apply RFs: RF3 RF4 RF5
----------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF2 c_current_addr_sk->[ca_address_sk]
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------PhysicalProject
------------------------------------filter((customer_address.ca_gmt_offset = -7.00))
--------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF2
--------------------------------PhysicalProject
----------------------------------filter((customer_address.ca_gmt_offset = -7.00))
------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF2
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashJoin[INNER_JOIN] hashCondition=((household_demographics.hd_demo_sk = customer.c_current_hdemo_sk)) otherCondition=() build RFs:RF1 hd_demo_sk->[c_current_hdemo_sk]
------------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_demographics.cd_demo_sk = customer.c_current_cdemo_sk)) otherCondition=() build RFs:RF0 cd_demo_sk->[c_current_cdemo_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------hashAgg[LOCAL]
----------------PhysicalProject
------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[wr_order_number,ws_order_number]
--------------------PhysicalDistribute[DistributionSpecHash]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
--------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 RF6
--------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 RF6
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
--------------------PhysicalProject
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF7 ws_order_number->[ws_order_number,ws_order_number]
------------------------PhysicalDistribute[DistributionSpecHash]
Expand Down
Loading