Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum Key {
/** Common Settings for SQL and PPL. */
QUERY_MEMORY_LIMIT("plugins.query.memory_limit"),
QUERY_SIZE_LIMIT("plugins.query.size_limit"),
QUERY_SYSTEM_LIMIT("plugins.query.system_limit"),
Comment thread
LantaoJin marked this conversation as resolved.
ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"),
DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"),
DATASOURCES_LIMIT("plugins.query.datasources.limit"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class CalcitePlanContext {
public final FunctionProperties functionProperties;
public final QueryType queryType;
public final Integer querySizeLimit;
public final Integer querySysLimit;

@Getter @Setter private boolean isResolvingJoinCondition = false;
@Getter @Setter private boolean isResolvingSubquery = false;
Expand All @@ -52,9 +53,11 @@ public class CalcitePlanContext {

@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;

private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
private CalcitePlanContext(
FrameworkConfig config, Integer querySizeLimit, Integer querySysLimit, QueryType queryType) {
this.config = config;
this.querySizeLimit = querySizeLimit;
this.querySysLimit = querySysLimit;
this.queryType = queryType;
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
Expand Down Expand Up @@ -93,12 +96,12 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
}

public CalcitePlanContext clone() {
return new CalcitePlanContext(config, querySizeLimit, queryType);
return new CalcitePlanContext(config, querySizeLimit, querySysLimit, queryType);
}

public static CalcitePlanContext create(
FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
return new CalcitePlanContext(config, querySizeLimit, queryType);
FrameworkConfig config, Integer querySizeLimit, Integer querySysLimit, QueryType queryType) {
return new CalcitePlanContext(config, querySizeLimit, querySysLimit, queryType);
}

public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
Expand All @@ -35,6 +36,7 @@
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -107,6 +109,7 @@
import org.opensearch.sql.ast.tree.Trendline.TrendlineType;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
import org.opensearch.sql.calcite.utils.PlanUtils;
Expand All @@ -133,6 +136,21 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
return unresolved.accept(this, context);
}

/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
private void replaceTop(RelBuilder relBuilder, RelNode relNode) {
try {
Method method = RelBuilder.class.getDeclaredMethod("replaceTop", RelNode.class);
method.setAccessible(true);
method.invoke(relBuilder, relNode);
} catch (Exception e) {
throw new IllegalStateException("Unable to invoke RelBuilder.replaceTop", e);
}
}

private RelNode sysLimit(RelNode child, RexNode fetch) {
return LogicalSystemLimit.create(child, RelCollations.EMPTY, null, fetch);
}

@Override
public RelNode visitRelation(Relation node, CalcitePlanContext context) {
context.relBuilder.scan(node.getTableQualifiedName().getParts());
Expand Down Expand Up @@ -642,8 +660,24 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {

@Override
public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<UnresolvedPlan> children = node.getChildren();
children.forEach(c -> analyze(c, context));
// 1. visit left child
analyze(node.getLeft(), context);
// 2. add system limit to left side (main-search) if join type is right outer
if (node.getJoinType() == Join.JoinType.RIGHT) {
replaceTop(
context.relBuilder,
sysLimit(context.relBuilder.peek(), context.relBuilder.literal(context.querySysLimit)));
}
// 3. visit right child
analyze(node.getRight(), context);
// 4. add system limit to right side (subsearch) if join type is not semi and anti
if (node.getJoinType() != Join.JoinType.RIGHT
&& node.getJoinType() != Join.JoinType.SEMI
&& node.getJoinType() != Join.JoinType.ANTI) {
replaceTop(
context.relBuilder,
sysLimit(context.relBuilder.peek(), context.relBuilder.literal(context.querySysLimit)));
}
RexNode joinCondition =
node.getJoinCondition()
.map(c -> rexVisitor.analyzeJoinCondition(c, context))
Expand Down Expand Up @@ -768,22 +802,27 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
expectedProvidedFieldNames = newExpectedFieldNames;
}

// 5. Resolve join condition. Note, this operation should be done after finishing all analyze.
// 5. Add system limit to right side (subsearch)
replaceTop(
context.relBuilder,
sysLimit(context.relBuilder.peek(), context.relBuilder.literal(context.querySysLimit)));

// 6. Resolve join condition. Note, this operation should be done after finishing all analyze.
JoinAndLookupUtils.addJoinForLookUp(node, context);

// 6. Add projection for coalesce fields if there is.
// 7. Add projection for coalesce fields if there is.
if (!newCoalesceList.isEmpty()) {
context.relBuilder.projectPlus(newCoalesceList);
}

// 7. Add projection to remove unnecessary fields
// 8. Add projection to remove unnecessary fields
// NOTE: Need to lazy invoke projectExcept until finishing all analyzing,
// otherwise the field names may have changed because of field name duplication.
if (!toBeRemovedFields.isEmpty()) {
context.relBuilder.projectExcept(toBeRemovedFields);
}

// 7. Rename the fields to the expected names.
// 9. Rename the fields to the expected names.
JoinAndLookupUtils.renameToExpectedFields(
expectedProvidedFieldNames,
sourceFieldsNames.size() - duplicatedSourceFields.size(),
Expand Down Expand Up @@ -1449,14 +1488,18 @@ private void buildExpandRelNode(
.uncollect(List.of(), false)
.build();

// 6. Perform a nested-loop join (correlate) between the original table and the expanded
// 6. add system limit to right side
RelNode rightNodeWithLimit =
sysLimit(rightNode, context.relBuilder.literal(context.querySysLimit));

// 7. Perform a nested-loop join (correlate) between the original table and the expanded
// array field.
// The last parameter has to refer to the array to be expanded on the left side. It will
// be used by the right side to correlate with the left side.
context
.relBuilder
.push(leftNode)
.push(rightNode)
.push(rightNodeWithLimit)
.correlate(JoinRelType.INNER, correlVariable.get().id, List.of(arrayFieldRex))
// 7. Remove the original array field from the output.
// TODO: RFC: should we keep the original array field when alias is present?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;

import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rex.RexNode;
import org.checkerframework.checker.nullness.qual.Nullable;

/** System level limit logical plan, comparing to user level plan {@link LogicalSort}. */
public class LogicalSystemLimit extends Sort {

private LogicalSystemLimit(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode input,
RelCollation collation,
@Nullable RexNode offset,
@Nullable RexNode fetch) {
this(cluster, traitSet, Collections.emptyList(), input, collation, offset, fetch);
}

private LogicalSystemLimit(
RelOptCluster cluster,
RelTraitSet traitSet,
List<RelHint> hints,
RelNode input,
RelCollation collation,
@Nullable RexNode offset,
@Nullable RexNode fetch) {
super(cluster, traitSet, hints, input, collation, offset, fetch);
assert traitSet.containsIfApplicable(Convention.NONE);
}

public static LogicalSystemLimit create(
RelNode input, RelCollation collation, @Nullable RexNode offset, @Nullable RexNode fetch) {
RelOptCluster cluster = input.getCluster();
collation = RelCollationTraitDef.INSTANCE.canonize(collation);
RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE).replace(collation);
return new LogicalSystemLimit(cluster, traitSet, input, collation, offset, fetch);
}

@Override
public Sort copy(
RelTraitSet traitSet,
RelNode newInput,
RelCollation newCollation,
@Nullable RexNode offset,
@Nullable RexNode fetch) {
return new LogicalSystemLimit(
getCluster(), traitSet, hints, newInput, newCollation, offset, fetch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void executeWithCalcite(
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(),
settings.getSettingValue(Key.QUERY_SIZE_LIMIT),
getQuerySizeLimit(),
getQuerySystemLimit(),
queryType);
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode);
Expand Down Expand Up @@ -133,7 +134,10 @@ public void explainWithCalcite(
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
buildFrameworkConfig(),
getQuerySizeLimit(),
getQuerySystemLimit(),
queryType);
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode);
RelNode calcitePlan = convertToCalcitePlan(optimized);
Expand Down Expand Up @@ -276,6 +280,10 @@ private Integer getQuerySizeLimit() {
return settings == null ? null : settings.getSettingValue(Key.QUERY_SIZE_LIMIT);
}

private Integer getQuerySystemLimit() {
return settings == null ? null : settings.getSettingValue(Key.QUERY_SYSTEM_LIMIT);
}

// TODO https://github.com/opensearch-project/sql/issues/3457
// Calcite is not available for SQL query now. Maybe release in 3.1.0?
private boolean shouldUseCalcite(QueryType queryType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setUpContext() {

mockedStatic.when(() -> CalciteToolsHelper.create(any(), any(), any())).thenReturn(relBuilder);

context = CalcitePlanContext.create(frameworkConfig, 100, QueryType.PPL);
context = CalcitePlanContext.create(frameworkConfig, 100, 50000, QueryType.PPL);
}

@AfterEach
Expand Down
61 changes: 60 additions & 1 deletion docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ plugins.query.size_limit
Description
-----------

The new engine fetches a default size of index from OpenSearch set by this setting, the default value equals to max result window in index level (10000 by default). You can change the value to any value not greater than the max result window value in index level (`index.max_result_window`), here is an example::
The size configures the maximum amount of rows to be fetched from query execution results. The default value is: 10000. You can change the value::

>> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{
"transient" : {
Expand All @@ -188,6 +188,65 @@ Result set::
}
}

plugins.query.system_limit
==========================

Description
-----------

The size configures the maximum of rows in the subsearch to data-intensive operations against (e.g. join, lookup). The default value is: 50000. Value range is from 0 to 2147483647 (Int.MaxValue).

PPL commands includes ``join``, ``lookup`` and ``expand`` will be affected by this configuration. In future, we can add more command argument to control specific command.

For Join, with join type

* SEMI, ANTI: no affect
* RIGHT: add a LogicalSystemLimit operator to left side (main-search)
* Others: add a LogicalSystemLimit operator to right side (sub-search)

For Lookup

* add a LogicalSystemLimit operator to right side (sub-search)

For expand

* add a LogicalSystemLimit operator to right side (sub-search)

Version
-------
3.1.0

Example
-------

Change the system_limit to 2147483647 (max)::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"persistent" : {"plugins.query.system_limit" : "2147483647"}}'
{
"acknowledged": true,
"persistent": {
"plugins": {
"query": {
"system_limit": "2147483647"
}
}
},
"transient": {}
}

Rollback to default value::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"persistent" : {"plugins.query.system_limit" : null}}'
{
"acknowledged": true,
"persistent": {},
"transient": {}
}

plugins.query.memory_limit
==========================

Expand Down
Loading
Loading