Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -70,6 +71,7 @@ public abstract class FileScanNode extends ExternalScanNode {
protected long totalPartitionNum = 0;
protected long readPartitionNum = 0;
protected long fileSplitSize;
public long rowCount = 0;

public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
Expand All @@ -95,6 +97,10 @@ protected void toThrift(TPlanNode planNode) {
planNode.setFileScanNode(fileScanNode);
}

public long getPushDownCount() {
return 0;
}

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
Expand Down Expand Up @@ -170,7 +176,13 @@ public int compare(TFileRangeDesc o1, TFileRangeDesc o2) {
output.append(String.format("avgRowSize=%s, ", avgRowSize));
}
output.append(String.format("numNodes=%s", numNodes)).append("\n");
output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp)).append("\n");

// pushdown agg
output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp));
if (pushDownAggNoGroupingOp.equals(TPushAggOp.COUNT)) {
output.append(" (").append(getPushDownCount()).append(")");
}
output.append("\n");

return output.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -213,6 +212,12 @@ private List<Split> doGetSplits() throws UserException {
HashSet<String> partitionPathSet = new HashSet<>();
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long rowCount = getCountFromSnapshot();
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount >= 0) {
this.rowCount = rowCount;
return new ArrayList<>();
}

CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
Expand Down Expand Up @@ -266,12 +271,6 @@ private List<Split> doGetSplits() throws UserException {
throw new UserException(e.getMessage(), e.getCause());
}

TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
// we can create a special empty split and skip the plan process
return Collections.singletonList(splits.get(0));
}

readPartitionNum = partitionPathSet.size();

return splits;
Expand Down Expand Up @@ -425,7 +424,7 @@ private long getCountFromSnapshot() {

// empty table
if (snapshot == null) {
return -1;
return 0;
}

Map<String, String> summary = snapshot.summary();
Expand All @@ -442,12 +441,17 @@ protected void toThrift(TPlanNode planNode) {
super.toThrift(planNode);
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot > 0) {
if (countFromSnapshot >= 0) {
planNode.setPushDownCount(countFromSnapshot);
}
}
}

@Override
public long getPushDownCount() {
return getCountFromSnapshot();
}

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (pushdownIcebergPredicates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
Expand All @@ -49,14 +51,18 @@
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.SessionVariable;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -66,6 +72,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -540,7 +547,23 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
}
}

return Optional.empty();
if (physicalPlan instanceof PhysicalResultSink
&& physicalPlan.child(0) instanceof PhysicalHashAggregate && !getScanNodes().isEmpty()
&& getScanNodes().get(0) instanceof IcebergScanNode) {
List<Column> columns = Lists.newArrayList();
NamedExpression output = physicalPlan.getOutput().get(0);
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) {
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(
Lists.newArrayList(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount))));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}
return Optional.empty();
} else {
return Optional.empty();
}
}

private void setFormatOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
Expand Down Expand Up @@ -1087,6 +1088,12 @@ public PlanFragment visitPhysicalStorageLayerAggregate(
+ storageLayerAggregate.getAggOp());
}

if (storageLayerAggregate.getRelation() instanceof PhysicalFileScan
&& pushAggOp.equals(TPushAggOp.COUNT)
&& !ConnectContext.get().getSessionVariable().isEnableCountPushDownForExternalTable()) {
pushAggOp = TPushAggOp.NONE;
}

context.setRelationPushAggOp(
storageLayerAggregate.getRelation().getRelationId(), pushAggOp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.nereids.PlannerHook;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -641,6 +642,21 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
List<Column> columns = new ArrayList<>(selectItems.size());
List<String> columnLabels = parsedSelectStmt.getColLabels();
List<String> data = new ArrayList<>();
if ((singleNodePlanner.getScanNodes().size() > 0 && singleNodePlanner.getScanNodes().get(0)
instanceof IcebergScanNode) && (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0)) {
SelectListItem item = selectItems.get(0);
Expr expr = item.getExpr();
String columnName = columnLabels.get(0);
columns.add(new Column(columnName, expr.getType()));
data.add(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount));
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}
if (!parsedSelectStmt.getTableRefs().isEmpty()) {
return Optional.empty();
}
FormatOptions options = FormatOptions.getDefault();
for (int i = 0; i < selectItems.size(); i++) {
SelectListItem item = selectItems.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String FORCE_JNI_SCANNER = "force_jni_scanner";

public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = "enable_count_push_down_for_external_table";

public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection";

public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = "max_msg_size_of_result_receiver";
Expand Down Expand Up @@ -1755,6 +1757,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
private boolean forceJniScanner = false;

@VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"})
private boolean enableCountPushDownForExternalTable = true;

public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids";

public Set<Integer> getIgnoredRuntimeFilterIds() {
Expand Down Expand Up @@ -3899,6 +3905,10 @@ public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}

public boolean isEnableCountPushDownForExternalTable() {
return enableCountPushDownForExternalTable;
}

public boolean isForceToLocalShuffle() {
return getEnablePipelineXEngine() && enableLocalShuffle && enableNereidsPlanner && forceToLocalShuffle;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
1000

-- !q02 --
1000

-- !q03 --
1000

-- !q04 --
1000

-- !q05 --
1000

-- !q06 --
1000

-- !q07 --
1000

-- !q08 --
1000

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable iceberg test.")
return
}

String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String catalog_name = "test_iceberg_optimize_count"

try {

sql """drop catalog if exists ${catalog_name}"""
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://${externalEnvIp}:${rest_port}',
"s3.access_key" = "admin",
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
"s3.region" = "us-east-1"
);"""

sql """ switch ${catalog_name} """
sql """ use format_v2 """

sqlstr1 = """ select count(*) from sample_cow_orc; """
sqlstr2 = """ select count(*) from sample_cow_parquet; """
sqlstr3 = """ select count(*) from sample_mor_orc; """
sqlstr4 = """ select count(*) from sample_mor_parquet; """

// use push down count
sql """ set enable_count_push_down_for_external_table=true; """

qt_q01 """${sqlstr1}"""
qt_q02 """${sqlstr2}"""
qt_q03 """${sqlstr3}"""
qt_q04 """${sqlstr4}"""

explain {
sql("""${sqlstr1}""")
contains """pushdown agg=COUNT (1000)"""
}
explain {
sql("""${sqlstr2}""")
contains """pushdown agg=COUNT (1000)"""
}
explain {
sql("""${sqlstr3}""")
contains """pushdown agg=COUNT (1000)"""
}
explain {
sql("""${sqlstr4}""")
contains """pushdown agg=COUNT (1000)"""
}

// don't use push down count
sql """ set enable_count_push_down_for_external_table=false; """

qt_q05 """${sqlstr1}"""
qt_q06 """${sqlstr2}"""
qt_q07 """${sqlstr3}"""
qt_q08 """${sqlstr4}"""

explain {
sql("""${sqlstr1}""")
contains """pushdown agg=NONE"""
}
explain {
sql("""${sqlstr2}""")
contains """pushdown agg=NONE"""
}
explain {
sql("""${sqlstr3}""")
contains """pushdown agg=NONE"""
}
explain {
sql("""${sqlstr4}""")
contains """pushdown agg=NONE"""
}

} finally {
sql """ set enable_count_push_down_for_external_table=true; """
sql """drop catalog if exists ${catalog_name}"""
}
}