diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index a56ac1d30c3..5a21cdb3779 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -43,6 +43,7 @@ public enum Key { /** Common Settings for SQL and PPL. */ QUERY_MEMORY_LIMIT("plugins.query.memory_limit"), QUERY_SIZE_LIMIT("plugins.query.size_limit"), + QUERY_SYSTEM_LIMIT("plugins.query.system_limit"), ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"), DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"), DATASOURCES_LIMIT("plugins.query.datasources.limit"), diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 7c0872b67fc..5114884a642 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -35,6 +35,7 @@ public class CalcitePlanContext { public final FunctionProperties functionProperties; public final QueryType queryType; public final Integer querySizeLimit; + public final Integer querySysLimit; @Getter @Setter private boolean isResolvingJoinCondition = false; @Getter @Setter private boolean isResolvingSubquery = false; @@ -52,9 +53,11 @@ public class CalcitePlanContext { @Getter public Map rexLambdaRefMap; - private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) { + private CalcitePlanContext( + FrameworkConfig config, Integer querySizeLimit, Integer querySysLimit, QueryType queryType) { this.config = config; this.querySizeLimit = querySizeLimit; + this.querySysLimit = querySysLimit; this.queryType = queryType; this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY); this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection); @@ -93,12 +96,12 @@ public Optional peekCorrelVar() { } public CalcitePlanContext clone() { - return new CalcitePlanContext(config, querySizeLimit, queryType); + return new CalcitePlanContext(config, querySizeLimit, querySysLimit, queryType); } public static CalcitePlanContext create( - FrameworkConfig config, Integer querySizeLimit, QueryType queryType) { - return new CalcitePlanContext(config, querySizeLimit, queryType); + FrameworkConfig config, Integer querySizeLimit, Integer querySysLimit, QueryType queryType) { + return new CalcitePlanContext(config, querySizeLimit, querySysLimit, queryType); } public void putRexLambdaRefMap(Map candidateMap) { diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 94cfe3c6e0c..ab93d054767 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Streams; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; @@ -35,6 +36,7 @@ import java.util.stream.Stream; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.ViewExpanders; +import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.JoinRelType; @@ -107,6 +109,7 @@ import org.opensearch.sql.ast.tree.Trendline.TrendlineType; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Window; +import org.opensearch.sql.calcite.plan.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.OpenSearchConstants; import org.opensearch.sql.calcite.utils.JoinAndLookupUtils; import org.opensearch.sql.calcite.utils.PlanUtils; @@ -133,6 +136,21 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) { return unresolved.accept(this, context); } + /** Adds a rel node to the top of the stack while preserving the field names and aliases. */ + private void replaceTop(RelBuilder relBuilder, RelNode relNode) { + try { + Method method = RelBuilder.class.getDeclaredMethod("replaceTop", RelNode.class); + method.setAccessible(true); + method.invoke(relBuilder, relNode); + } catch (Exception e) { + throw new IllegalStateException("Unable to invoke RelBuilder.replaceTop", e); + } + } + + private RelNode sysLimit(RelNode child, RexNode fetch) { + return LogicalSystemLimit.create(child, RelCollations.EMPTY, null, fetch); + } + @Override public RelNode visitRelation(Relation node, CalcitePlanContext context) { context.relBuilder.scan(node.getTableQualifiedName().getParts()); @@ -642,8 +660,24 @@ private Optional extractAliasLiteral(RexNode node) { @Override public RelNode visitJoin(Join node, CalcitePlanContext context) { - List children = node.getChildren(); - children.forEach(c -> analyze(c, context)); + // 1. visit left child + analyze(node.getLeft(), context); + // 2. add system limit to left side (main-search) if join type is right outer + if (node.getJoinType() == Join.JoinType.RIGHT) { + replaceTop( + context.relBuilder, + sysLimit(context.relBuilder.peek(), context.relBuilder.literal(context.querySysLimit))); + } + // 3. visit right child + analyze(node.getRight(), context); + // 4. add system limit to right side (subsearch) if join type is not semi and anti + if (node.getJoinType() != Join.JoinType.RIGHT + && node.getJoinType() != Join.JoinType.SEMI + && node.getJoinType() != Join.JoinType.ANTI) { + replaceTop( + context.relBuilder, + sysLimit(context.relBuilder.peek(), context.relBuilder.literal(context.querySysLimit))); + } RexNode joinCondition = node.getJoinCondition() .map(c -> rexVisitor.analyzeJoinCondition(c, context)) @@ -768,22 +802,27 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { expectedProvidedFieldNames = newExpectedFieldNames; } - // 5. Resolve join condition. Note, this operation should be done after finishing all analyze. + // 5. Add system limit to right side (subsearch) + replaceTop( + context.relBuilder, + sysLimit(context.relBuilder.peek(), context.relBuilder.literal(context.querySysLimit))); + + // 6. Resolve join condition. Note, this operation should be done after finishing all analyze. JoinAndLookupUtils.addJoinForLookUp(node, context); - // 6. Add projection for coalesce fields if there is. + // 7. Add projection for coalesce fields if there is. if (!newCoalesceList.isEmpty()) { context.relBuilder.projectPlus(newCoalesceList); } - // 7. Add projection to remove unnecessary fields + // 8. Add projection to remove unnecessary fields // NOTE: Need to lazy invoke projectExcept until finishing all analyzing, // otherwise the field names may have changed because of field name duplication. if (!toBeRemovedFields.isEmpty()) { context.relBuilder.projectExcept(toBeRemovedFields); } - // 7. Rename the fields to the expected names. + // 9. Rename the fields to the expected names. JoinAndLookupUtils.renameToExpectedFields( expectedProvidedFieldNames, sourceFieldsNames.size() - duplicatedSourceFields.size(), @@ -1449,14 +1488,18 @@ private void buildExpandRelNode( .uncollect(List.of(), false) .build(); - // 6. Perform a nested-loop join (correlate) between the original table and the expanded + // 6. add system limit to right side + RelNode rightNodeWithLimit = + sysLimit(rightNode, context.relBuilder.literal(context.querySysLimit)); + + // 7. Perform a nested-loop join (correlate) between the original table and the expanded // array field. // The last parameter has to refer to the array to be expanded on the left side. It will // be used by the right side to correlate with the left side. context .relBuilder .push(leftNode) - .push(rightNode) + .push(rightNodeWithLimit) .correlate(JoinRelType.INNER, correlVariable.get().id, List.of(arrayFieldRex)) // 7. Remove the original array field from the output. // TODO: RFC: should we keep the original array field when alias is present? diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java b/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java new file mode 100644 index 00000000000..7205894a313 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan; + +import java.util.Collections; +import java.util.List; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rex.RexNode; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** System level limit logical plan, comparing to user level plan {@link LogicalSort}. */ +public class LogicalSystemLimit extends Sort { + + private LogicalSystemLimit( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + RelCollation collation, + @Nullable RexNode offset, + @Nullable RexNode fetch) { + this(cluster, traitSet, Collections.emptyList(), input, collation, offset, fetch); + } + + private LogicalSystemLimit( + RelOptCluster cluster, + RelTraitSet traitSet, + List hints, + RelNode input, + RelCollation collation, + @Nullable RexNode offset, + @Nullable RexNode fetch) { + super(cluster, traitSet, hints, input, collation, offset, fetch); + assert traitSet.containsIfApplicable(Convention.NONE); + } + + public static LogicalSystemLimit create( + RelNode input, RelCollation collation, @Nullable RexNode offset, @Nullable RexNode fetch) { + RelOptCluster cluster = input.getCluster(); + collation = RelCollationTraitDef.INSTANCE.canonize(collation); + RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE).replace(collation); + return new LogicalSystemLimit(cluster, traitSet, input, collation, offset, fetch); + } + + @Override + public Sort copy( + RelTraitSet traitSet, + RelNode newInput, + RelCollation newCollation, + @Nullable RexNode offset, + @Nullable RexNode fetch) { + return new LogicalSystemLimit( + getCluster(), traitSet, hints, newInput, newCollation, offset, fetch); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index ef8876a9275..8718f22e4a5 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -99,7 +99,8 @@ public void executeWithCalcite( CalcitePlanContext context = CalcitePlanContext.create( buildFrameworkConfig(), - settings.getSettingValue(Key.QUERY_SIZE_LIMIT), + getQuerySizeLimit(), + getQuerySystemLimit(), queryType); RelNode relNode = analyze(plan, context); RelNode optimized = optimize(relNode); @@ -133,7 +134,10 @@ public void explainWithCalcite( () -> { CalcitePlanContext context = CalcitePlanContext.create( - buildFrameworkConfig(), getQuerySizeLimit(), queryType); + buildFrameworkConfig(), + getQuerySizeLimit(), + getQuerySystemLimit(), + queryType); RelNode relNode = analyze(plan, context); RelNode optimized = optimize(relNode); RelNode calcitePlan = convertToCalcitePlan(optimized); @@ -276,6 +280,10 @@ private Integer getQuerySizeLimit() { return settings == null ? null : settings.getSettingValue(Key.QUERY_SIZE_LIMIT); } + private Integer getQuerySystemLimit() { + return settings == null ? null : settings.getSettingValue(Key.QUERY_SYSTEM_LIMIT); + } + // TODO https://github.com/opensearch-project/sql/issues/3457 // Calcite is not available for SQL query now. Maybe release in 3.1.0? private boolean shouldUseCalcite(QueryType queryType) { diff --git a/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java index 62f3fadd326..c58d28b7ead 100644 --- a/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java @@ -66,7 +66,7 @@ public void setUpContext() { mockedStatic.when(() -> CalciteToolsHelper.create(any(), any(), any())).thenReturn(relBuilder); - context = CalcitePlanContext.create(frameworkConfig, 100, QueryType.PPL); + context = CalcitePlanContext.create(frameworkConfig, 100, 50000, QueryType.PPL); } @AfterEach diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index ab74e2bb567..a6a18b1a80a 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -166,7 +166,7 @@ plugins.query.size_limit Description ----------- -The new engine fetches a default size of index from OpenSearch set by this setting, the default value equals to max result window in index level (10000 by default). You can change the value to any value not greater than the max result window value in index level (`index.max_result_window`), here is an example:: +The size configures the maximum amount of rows to be fetched from query execution results. The default value is: 10000. You can change the value:: >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{ "transient" : { @@ -188,6 +188,65 @@ Result set:: } } +plugins.query.system_limit +========================== + +Description +----------- + +The size configures the maximum of rows in the subsearch to data-intensive operations against (e.g. join, lookup). The default value is: 50000. Value range is from 0 to 2147483647 (Int.MaxValue). + +PPL commands includes ``join``, ``lookup`` and ``expand`` will be affected by this configuration. In future, we can add more command argument to control specific command. + +For Join, with join type + +* SEMI, ANTI: no affect +* RIGHT: add a LogicalSystemLimit operator to left side (main-search) +* Others: add a LogicalSystemLimit operator to right side (sub-search) + +For Lookup + +* add a LogicalSystemLimit operator to right side (sub-search) + +For expand + +* add a LogicalSystemLimit operator to right side (sub-search) + +Version +------- +3.1.0 + +Example +------- + +Change the system_limit to 2147483647 (max):: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.query.system_limit" : "2147483647"}}' + { + "acknowledged": true, + "persistent": { + "plugins": { + "query": { + "system_limit": "2147483647" + } + } + }, + "transient": {} + } + +Rollback to default value:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.query.system_limit" : null}}' + { + "acknowledged": true, + "persistent": {}, + "transient": {} + } + plugins.query.memory_limit ========================== diff --git a/docs/user/ppl/admin/settings.rst b/docs/user/ppl/admin/settings.rst index fad7164d644..e4381a088bf 100644 --- a/docs/user/ppl/admin/settings.rst +++ b/docs/user/ppl/admin/settings.rst @@ -125,7 +125,7 @@ plugins.query.size_limit Description ----------- -The size configures the maximum amount of rows to be fetched from PPL execution results. The default value is: 10000 +The size configures the maximum amount of rows to be fetched from query execution results. The default value is: 10000 Example ------- @@ -159,3 +159,64 @@ Rollback to default value:: } Note: the legacy settings of ``opendistro.query.size_limit`` is deprecated, it will fallback to the new settings if you request an update with the legacy name. + +plugins.query.system_limit +========================== + +Description +----------- + +The size configures the maximum of rows in the subsearch to data-intensive operations against (e.g. join, lookup). The default value is: 50000. Value range is from 0 to 2147483647 (Int.MaxValue). + +Since v3.0.0, PPL introduces commands that may increase data volume. To prevent out-of-memory problem, the system automatically add a ``LogicalSystemLimit`` in plan for data-intensive operations. + +PPL commands includes ``join``, ``lookup`` and ``expand`` will be affected by this configuration. In future, we can add more command argument to control specific command. + +For Join, with join type + +* SEMI, ANTI: no affect +* RIGHT: add a LogicalSystemLimit operator to left side (main-search) +* Others: add a LogicalSystemLimit operator to right side (sub-search) + +For Lookup + +* add a LogicalSystemLimit operator to right side (sub-search) + +For expand + +* add a LogicalSystemLimit operator to right side (sub-search) + +Version +------- +3.1.0 + +Example +------- + +Change the system_limit to 2147483647 (max):: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.query.system_limit" : "2147483647"}}' + { + "acknowledged": true, + "persistent": { + "plugins": { + "query": { + "system_limit": "2147483647" + } + } + }, + "transient": {} + } + +Rollback to default value:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.query.system_limit" : null}}' + { + "acknowledged": true, + "persistent": {}, + "transient": {} + } diff --git a/docs/user/ppl/limitations/limitations.rst b/docs/user/ppl/limitations/limitations.rst index 22b7fe52ae4..dfbdfc9ea12 100644 --- a/docs/user/ppl/limitations/limitations.rst +++ b/docs/user/ppl/limitations/limitations.rst @@ -84,3 +84,35 @@ plugins.query.field_type_tolerance setting is enabled, the SQL/PPL plugin will h scalar data types, allowing basic queries (e.g., source = tbl | where condition). However, using multi-value fields in expressions or functions will result in exceptions. If this setting is disabled or absent, only the first element of an array is returned, preserving the default behavior. + +System Limitation for data-intensive operations +=============================================== + +Since v3.0.0, PPL introduces commands that may increase data volume. To prevent out-of-memory problem, the system +automatically enforces a LogicalSystemLimit operator for such commands. + +``plugins.query.system_limit``: The size configures the maximum of rows in the subsearch to data-intensive operations +against (e.g. join, lookup). The default value is: 50000. Value range is from 0 to 2147483647 (Int.MaxValue). +Note, only apply system limit automatically to data-intensive (data-bloat) operations. + +PPL commands includes ``join``, ``lookup`` and ``expand`` will be affected by this configuration. +In future, we can add more command argument to control specific command. + +For Join, with join type + +* SEMI, ANTI: no affect +* RIGHT: add a LogicalSystemLimit operator to left side (main-search) +* Others: add a LogicalSystemLimit operator to right side (sub-search) + +For Lookup + +* add a LogicalSystemLimit operator to right side (sub-search) + +For expand + +* add a LogicalSystemLimit operator to right side (sub-search) + +The results of impacted search (for example, the lookup table of lookup command, right side of inner join, etc.) +cannot exceed the limitation (50000 rows by default). If the actual number of rows in lookup table or right side +of inner join is greater then the system limit, only the number of rows specified by the configuration will be searched. +You can set the configuration to the maximum integer value (2147483647) if you are certain resources are not a concern. \ No newline at end of file diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 053c5f73de1..cf3e9a72e63 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -5,8 +5,14 @@ package org.opensearch.sql.calcite.remote; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_HOBBIES; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY; +import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId; + import java.io.IOException; import org.junit.Ignore; +import org.junit.jupiter.api.Test; import org.opensearch.sql.ppl.ExplainIT; public class CalciteExplainIT extends ExplainIT { @@ -15,6 +21,62 @@ public void init() throws Exception { super.init(); enableCalcite(); disallowCalciteFallback(); + loadIndex(Index.STATE_COUNTRY); + loadIndex(Index.OCCUPATION); + loadIndex(Index.HOBBIES); + } + + @Test + public void testPushDownSystemLimitForJoinExplain() throws Exception { + // the SYSTEM LIMIT should apply to right table of join + String expected = loadFromFile("expectedOutput/calcite/explain_join_push_system_limit.json"); + + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source=%s | inner join left=a, right=b ON a.name = b.name %s | fields " + + "a.name, a.age, a.state, a.country, b.occupation, b.country, b.salary", + TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION))); + } + + @Test + public void testPushDownSystemLimitForMultipleJoinExplain() throws Exception { + // the SYSTEM LIMIT should apply to each right table of multi-join + String expected = + loadFromFile("expectedOutput/calcite/explain_multi_join_push_system_limit.json"); + + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source=%s | inner join left=a, right=b ON a.name = b.name %s" + + " | left join left=a, right=b ON a.name = b.name %s", + TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION, TEST_INDEX_HOBBIES))); + } + + @Test + public void testExistsSubqueryExplain() throws Exception { + String expected = loadFromFile("expectedOutput/calcite/explain_exists_subsearch.json"); + + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s exists [ source = %s | where name = %s.name ]", + TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION, TEST_INDEX_STATE_COUNTRY))); + } + + @Test + public void testInSubqueryExplain() throws Exception { + String expected = loadFromFile("expectedOutput/calcite/explain_in_subsearch.json"); + + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s | where name in [ source = %s | fields name ]", + TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION))); } @Override diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSettingsIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSettingsIT.java index d07407500d0..b280024f780 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSettingsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSettingsIT.java @@ -5,7 +5,14 @@ package org.opensearch.sql.calcite.remote; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; + import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.ppl.SettingsIT; @@ -30,4 +37,52 @@ public void testQuerySizeLimit_NoPushdown() throws IOException { } }); } + + @Test + public void testQuerySystemLimit() throws IOException { + // system limit only impact data-intensive operations + setQuerySystemLimit(2); + JSONObject result = + executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK)); + verifyDataRows(result, rows("Hattie"), rows("Elinor"), rows("Virginia")); + + // for non data-intensive operations, the rows still control by query.size_limit + setQuerySizeLimit(1); + result = + executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK)); + verifyDataRows(result, rows("Hattie")); + resetQuerySizeLimit(); + resetQuerySystemLimit(); + } + + @Test + public void testQuerySystemLimitWithJoin() throws IOException { + JSONObject result = + executeQuery( + String.format( + "search source=%s age>35 | fields firstname" + + " | full join left=l right=r on l.firstname=r.firstname" + + " [ search source=%s | fields firstname ]", + TEST_INDEX_BANK, TEST_INDEX_BANK)); + verifyNumOfRows(result, 7); + + // system limit will impact data-intensive operations + setQuerySystemLimit(2); + result = + executeQuery( + String.format( + "search source=%s age>35 | fields firstname" + + " | full join left=l right=r on l.firstname=r.firstname" + + " [ search source=%s | fields firstname ]", + TEST_INDEX_BANK, TEST_INDEX_BANK)); + verifyNumOfRows(result, 4); + + // amount of final result should equals to query.size_limit + setQuerySizeLimit(1); + result = + executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK)); + verifyNumOfRows(result, 1); + resetQuerySizeLimit(); + resetQuerySystemLimit(); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java index 0dc69d0217d..280b0cddfad 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java @@ -122,6 +122,7 @@ private Settings defaultSettings() { .put(Key.PATTERN_MODE, "LABEL") .put(Key.PATTERN_MAX_SAMPLE_COUNT, 10) .put(Key.PATTERN_BUFFER_LIMIT, 100000) + .put(Key.QUERY_SYSTEM_LIMIT, 5000) .build(); @Override @@ -152,6 +153,7 @@ protected Settings enablePushdown() { .put(Key.PATTERN_MODE, "LABEL") .put(Key.PATTERN_MAX_SAMPLE_COUNT, 10) .put(Key.PATTERN_BUFFER_LIMIT, 100000) + .put(Key.QUERY_SYSTEM_LIMIT, 5000) .build(); @Override diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index e7086abb9a9..95944f9d015 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -94,6 +94,8 @@ public abstract class SQLIntegTestCase extends OpenSearchSQLRestTestCase { Integer.parseInt(System.getProperty("defaultQuerySizeLimit", "200")); public static final Integer DEFAULT_MAX_RESULT_WINDOW = Integer.parseInt(System.getProperty("defaultMaxResultWindow", "10000")); + public static final Integer DEFAULT_QUERY_SYSTEM_LIMIT = + Integer.parseInt(System.getProperty("defaultMaxResultWindow", "50000")); public boolean shouldResetQuerySizeLimit() { return true; @@ -189,6 +191,20 @@ protected void resetQuerySizeLimit() throws IOException { DEFAULT_QUERY_SIZE_LIMIT.toString())); } + protected void setQuerySystemLimit(Integer limit) throws IOException { + updateClusterSettings( + new ClusterSetting( + "transient", Settings.Key.QUERY_SYSTEM_LIMIT.getKeyValue(), limit.toString())); + } + + protected void resetQuerySystemLimit() throws IOException { + updateClusterSettings( + new ClusterSetting( + "transient", + Settings.Key.QUERY_SYSTEM_LIMIT.getKeyValue(), + DEFAULT_QUERY_SYSTEM_LIMIT.toString())); + } + @SneakyThrows protected void setDataSourcesEnabled(String clusterSettingType, boolean value) { updateClusterSettings( diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/SettingsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/SettingsIT.java index 16e1e1a532b..1a7b5c45e2f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/SettingsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/SettingsIT.java @@ -33,6 +33,7 @@ public void testQuerySizeLimit() throws IOException { result = executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK)); verifyDataRows(result, rows("Hattie")); + resetQuerySizeLimit(); } @Test @@ -62,5 +63,6 @@ public void testQuerySizeLimit_NoPushdown() throws IOException { "search source=%s | eval a = 1 | where age>35 | fields firstname", TEST_INDEX_BANK)); verifyDataRows(result, rows("Hattie")); + resetQuerySizeLimit(); } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_subsearch.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_subsearch.json new file mode 100644 index 00000000000..9830fdbca1d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_subsearch.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n LogicalFilter(condition=[EXISTS({\nLogicalProject(name=[$0], country=[$1], occupation=[$2], month=[$3], salary=[$4], year=[$5])\n LogicalFilter(condition=[=($0, $cor0.name)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]])\n})], variablesSet=[[$cor0]])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]])\n", + "physical": "EnumerableHashJoin(condition=[=($0, $6)], joinType=[semi])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"month\",\"year\",\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]], PushDownContext=[[PROJECT->[name], FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0})], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"exists\":{\"field\":\"name\",\"boost\":1.0}},\"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"name\":{\"terms\":{\"field\":\"name\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_in_subsearch.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_in_subsearch.json new file mode 100644 index 00000000000..784b2e92e6d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_in_subsearch.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n LogicalFilter(condition=[IN($0, {\nLogicalProject(name=[$0])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]])\n})], variablesSet=[[$cor0]])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]])\n", + "physical": "EnumerableHashJoin(condition=[=($0, $6)], joinType=[semi])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"month\",\"year\",\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]], PushDownContext=[[PROJECT->[name], AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0})], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"name\":{\"terms\":{\"field\":\"name\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_join_push_system_limit.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_join_push_system_limit.json new file mode 100644 index 00000000000..82a7e60dbef --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_join_push_system_limit.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalProject(name=[$0], age=[$5], state=[$2], country=[$1], occupation=[$8], b.country=[$7], salary=[$10])\n LogicalJoin(condition=[=($0, $6)], joinType=[inner])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]])\n LogicalSystemLimit(fetch=[50000])\n LogicalProject(name=[$0], country=[$1], occupation=[$2], month=[$3], salary=[$4], year=[$5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]])\n", + "physical": "EnumerableCalc(expr#0..7=[{inputs}], name=[$t0], age=[$t3], state=[$t2], country=[$t1], occupation=[$t6], b.country=[$t5], salary=[$t7])\n EnumerableMergeJoin(condition=[=($0, $4)], joinType=[inner])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]], PushDownContext=[[PROJECT->[name, country, state, age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n EnumerableLimit(fetch=[50000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]], PushDownContext=[[PROJECT->[name, country, occupation, salary]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"occupation\",\"salary\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_multi_join_push_system_limit.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_multi_join_push_system_limit.json new file mode 100644 index 00000000000..c3e9d09cc5a --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_multi_join_push_system_limit.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5], b.name=[$6], b.country=[$7], occupation=[$8], b.month=[$9], salary=[$10], b.year=[$11], b.name0=[$12], b.country0=[$13], language=[$14], b.month0=[$15], b.year0=[$16], hobby=[$17])\n LogicalJoin(condition=[=($0, $12)], joinType=[left])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5], b.name=[$6], b.country=[$7], occupation=[$8], b.month=[$9], salary=[$10], b.year=[$11])\n LogicalJoin(condition=[=($0, $6)], joinType=[inner])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]])\n LogicalSystemLimit(fetch=[50000])\n LogicalProject(name=[$0], country=[$1], occupation=[$2], month=[$3], salary=[$4], year=[$5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]])\n LogicalSystemLimit(fetch=[50000])\n LogicalProject(name=[$0], country=[$1], language=[$2], month=[$3], year=[$4], hobby=[$5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_hobbies]])\n", + "physical": "EnumerableMergeJoin(condition=[=($0, $12)], joinType=[left])\n EnumerableMergeJoin(condition=[=($0, $6)], joinType=[inner])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"month\",\"year\",\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n EnumerableLimit(fetch=[50000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_occupation]], PushDownContext=[[PROJECT->[name, country, occupation, month, salary, year]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"occupation\",\"month\",\"salary\",\"year\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n EnumerableLimit(fetch=[50000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_hobbies]], PushDownContext=[[PROJECT->[name, country, language, month, year, hobby]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"language\",\"month\",\"year\",\"hobby\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_brain_agg_push.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_brain_agg_push.json index f45faa2c5c5..d8b03c8f913 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_brain_agg_push.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_brain_agg_push.json @@ -1,6 +1,6 @@ { "calcite": { - "logical": "LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))], pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))], tokens=[SAFE_CAST(ITEM($1, 'tokens'))])\n LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n LogicalProject(email=[$9], $f17=[10], $f18=[100000])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n Uncollect\n LogicalProject(patterns_field=[$cor0.patterns_field])\n LogicalValues(tuples=[[{ 0 }]])\n", - "physical": "EnumerableCalc(expr#0..1=[{inputs}], expr#2=['pattern'], expr#3=[ITEM($t1, $t2)], expr#4=[SAFE_CAST($t3)], expr#5=['pattern_count'], expr#6=[ITEM($t1, $t5)], expr#7=[SAFE_CAST($t6)], expr#8=['tokens'], expr#9=[ITEM($t1, $t8)], expr#10=[SAFE_CAST($t9)], patterns_field=[$t4], pattern_count=[$t7], tokens=[$t10])\n EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n EnumerableAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[10], expr#2=[100000], proj#0..2=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[email]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"email\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableUncollect\n EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.patterns_field], patterns_field=[$t2])\n EnumerableValues(tuples=[[{ 0 }]])\n" + "logical": "LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))], pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))], tokens=[SAFE_CAST(ITEM($1, 'tokens'))])\n LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n LogicalProject(email=[$9], $f17=[10], $f18=[100000])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n LogicalSystemLimit(fetch=[50000])\n Uncollect\n LogicalProject(patterns_field=[$cor0.patterns_field])\n LogicalValues(tuples=[[{ 0 }]])\n", + "physical": "EnumerableCalc(expr#0..1=[{inputs}], expr#2=['pattern'], expr#3=[ITEM($t1, $t2)], expr#4=[SAFE_CAST($t3)], expr#5=['pattern_count'], expr#6=[ITEM($t1, $t5)], expr#7=[SAFE_CAST($t6)], expr#8=['tokens'], expr#9=[ITEM($t1, $t8)], expr#10=[SAFE_CAST($t9)], patterns_field=[$t4], pattern_count=[$t7], tokens=[$t10])\n EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n EnumerableAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[10], expr#2=[100000], proj#0..2=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[email]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"email\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableLimit(fetch=[50000])\n EnumerableUncollect\n EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.patterns_field], patterns_field=[$t2])\n EnumerableValues(tuples=[[{ 0 }]])\n" } } \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index fc6e5728700..e5d01aca080 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -142,6 +142,14 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting QUERY_SYSTEM_LIMIT_SETTING = + Setting.intSetting( + Key.QUERY_SYSTEM_LIMIT.getKeyValue(), + 50000, + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting METRICS_ROLLING_WINDOW_SETTING = Setting.longSetting( Key.METRICS_ROLLING_WINDOW.getKeyValue(), @@ -351,6 +359,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR, CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR_SETTING, new Updater(Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR)); + register( + settingBuilder, + clusterSettings, + Key.QUERY_SYSTEM_LIMIT, + QUERY_SYSTEM_LIMIT_SETTING, + new Updater(Key.QUERY_SYSTEM_LIMIT)); register( settingBuilder, clusterSettings, @@ -533,6 +547,7 @@ public static List> pluginSettings() { .add(DEFAULT_PATTERN_BUFFER_LIMIT_SETTING) .add(QUERY_MEMORY_LIMIT_SETTING) .add(QUERY_SIZE_LIMIT_SETTING) + .add(QUERY_SYSTEM_LIMIT_SETTING) .add(METRICS_ROLLING_WINDOW_SETTING) .add(METRICS_ROLLING_INTERVAL_SETTING) .add(DATASOURCE_URI_HOSTS_DENY_LIST) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 52a9b2df879..c0e0f5471b9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -11,6 +11,7 @@ import java.util.ArrayDeque; import java.util.List; import lombok.Getter; +import lombok.Setter; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; @@ -88,7 +89,7 @@ public double estimateRowCount(RelMetadataQuery mq) { case PROJECT -> rowCount; case FILTER -> NumberUtil.multiply( rowCount, RelMdUtil.guessSelectivity((RexNode) action.digest)); - case LIMIT -> (Integer) action.digest; + case LIMIT, SYSTEM_LIMIT -> (Integer) action.digest; } * estimateRowCountFactor, (a, b) -> null); @@ -99,6 +100,7 @@ public static class PushDownContext extends ArrayDeque { private boolean isAggregatePushed = false; private boolean isLimitPushed = false; + @Getter @Setter private boolean isSystemLimitPushed = false; @Override public PushDownContext clone() { @@ -135,6 +137,7 @@ protected enum PushDownType { AGGREGATION, // SORT, LIMIT, + SYSTEM_LIMIT, // not user specified limit, it's the limit pushed for high-cost operators // HIGHLIGHT, // NESTED } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index 56753a0cd93..03fbdf3bd01 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -6,6 +6,7 @@ package org.opensearch.sql.opensearch.storage.scan; import java.util.List; +import lombok.extern.log4j.Log4j2; import org.apache.calcite.adapter.enumerable.EnumerableConvention; import org.apache.calcite.adapter.enumerable.EnumerableRel; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; @@ -24,17 +25,14 @@ import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.rel.type.RelDataType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.calcite.plan.OpenSearchRules; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; /** The physical relational operator representing a scan of an OpenSearchIndex type. */ +@Log4j2 public class CalciteEnumerableIndexScan extends AbstractCalciteIndexScan implements EnumerableRel { - private static final Logger LOG = LogManager.getLogger(CalciteEnumerableIndexScan.class); - /** * Creates an CalciteOpenSearchIndexScan. * diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index 2e70a210d6e..f8d7c532e5f 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -39,7 +39,6 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; import org.opensearch.sql.ppl.parser.AstBuilder; import org.opensearch.sql.ppl.parser.AstStatementBuilder; @@ -77,8 +76,7 @@ protected CalcitePlanContext createBuilderContext() { /** Creates a CalcitePlanContext with transformed config. */ private CalcitePlanContext createBuilderContext(UnaryOperator transform) { config.context(Contexts.of(transform.apply(RelBuilder.Config.DEFAULT))); - return CalcitePlanContext.create( - config.build(), settings.getSettingValue(Key.QUERY_SIZE_LIMIT), PPL); + return CalcitePlanContext.create(config.build(), 200, 50000, PPL); } /** Get the root RelNode of the given PPL query */ diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java index ecb3a0ab60c..1a60eea217d 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java @@ -416,19 +416,22 @@ public void testRelationSubqueryAlias() { + " COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalSort(fetch=[10])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalSort(fetch=[10])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `d.DEPTNO`, `t`.`DNAME`," - + " `t`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t0`.`DEPTNO` `d.DEPTNO`, `t0`.`DNAME`," + + " `t0`.`LOC`\n" + "FROM `scott`.`EMP`\n" + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`\n" - + "LIMIT 10) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; + + "LIMIT 10) `t`\n" + + "LIMIT 50000) `t0` ON `EMP`.`DEPTNO` = `t0`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExpandTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExpandTest.java index 1ddab5ffd3a..5f74a4de5a7 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExpandTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExpandTest.java @@ -100,15 +100,18 @@ public void testExpand() { "LogicalProject(DEPTNO=[$0], EMPNOS=[$2])\n" + " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}])\n" + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " Uncollect\n" - + " LogicalProject(EMPNOS=[$cor0.EMPNOS])\n" - + " LogicalValues(tuples=[[{ 0 }]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " Uncollect\n" + + " LogicalProject(EMPNOS=[$cor0.EMPNOS])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `$cor0`.`DEPTNO`, `t00`.`EMPNOS`\n" + "SELECT `$cor0`.`DEPTNO`, `t1`.`EMPNOS`\n" + "FROM `scott`.`DEPT` `$cor0`,\n" - + "LATERAL UNNEST (SELECT `$cor0`.`EMPNOS`\n" - + "FROM (VALUES (0)) `t` (`ZERO`)) `t0` (`EMPNOS`) `t00`"; + + "LATERAL (SELECT `EMPNOS`\n" + + "FROM UNNEST (SELECT `$cor0`.`EMPNOS`\n" + + "FROM (VALUES (0)) `t` (`ZERO`)) `t0` (`EMPNOS`)\n" + + "LIMIT 50000) `t1`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -121,16 +124,19 @@ public void testExpandWithEval() { + " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])\n" + " LogicalProject(DEPTNO=[$0], EMPNOS=[$1], employee_no=[$1])\n" + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " Uncollect\n" - + " LogicalProject(employee_no=[$cor0.employee_no])\n" - + " LogicalValues(tuples=[[{ 0 }]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " Uncollect\n" + + " LogicalProject(employee_no=[$cor0.employee_no])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `$cor0`.`DEPTNO`, `$cor0`.`EMPNOS`, `t10`.`employee_no`\n" + "SELECT `$cor0`.`DEPTNO`, `$cor0`.`EMPNOS`, `t2`.`employee_no`\n" + "FROM (SELECT `DEPTNO`, `EMPNOS`, `EMPNOS` `employee_no`\n" + "FROM `scott`.`DEPT`) `$cor0`,\n" - + "LATERAL UNNEST (SELECT `$cor0`.`employee_no`\n" - + "FROM (VALUES (0)) `t` (`ZERO`)) `t1` (`employee_no`) `t10`"; + + "LATERAL (SELECT `employee_no`\n" + + "FROM UNNEST (SELECT `$cor0`.`employee_no`\n" + + "FROM (VALUES (0)) `t` (`ZERO`)) `t1` (`employee_no`)\n" + + "LIMIT 50000) `t2`"; verifyPPLToSparkSQL(root, expectedSparkSql); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java index 56f4e361740..afc5cdeea11 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java @@ -225,14 +225,17 @@ public void testInSubqueryAsJoinFilter() { + " LogicalTableScan(table=[[scott, BONUS]])\n" + "}))], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO` AND `EMP`.`ENAME` IN" - + " (SELECT `ENAME`\n" + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO` AND `EMP`.`ENAME` IN (SELECT" + + " `ENAME`\n" + "FROM `scott`.`BONUS`\n" + "WHERE `SAL` > 1000)\n" + "ORDER BY `EMP`.`EMPNO` DESC NULLS FIRST"; diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java index 964d5ae56bf..47dc39d9aff 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java @@ -24,16 +24,19 @@ public void testJoinConditionWithTableNames() { + " COMM=[$6], DEPTNO=[$7], DEPT.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `DEPT.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `DEPT.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -50,18 +53,21 @@ public void testJoinConditionWithAlias() { + " SAL=[$5], COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 6); String expectedSparkSql = "SELECT `d.DEPTNO`\n" + "FROM (SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`," - + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO`" - + " `d.DEPTNO`, `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO`" + + " `d.DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`) `t`\n" - + "WHERE `t`.`LOC` = 'CHICAGO'"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`) `t0`\n" + + "WHERE `t0`.`LOC` = 'CHICAGO'"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -74,16 +80,19 @@ public void testJoinConditionWithoutTableName() { + " COMM=[$6], DEPTNO=[$7], DEPT.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($1, $9)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 0); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `DEPT.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `DEPT.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`ENAME` = `DEPT`.`DNAME`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`ENAME` = `t`.`DNAME`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -96,16 +105,19 @@ public void testJoinWithSpecificAliases() { + " COMM=[$6], DEPTNO=[$7], r.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `r.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `r.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -121,17 +133,20 @@ public void testJoinWithMultiplePredicates() { + " LogicalJoin(condition=[AND(=($7, $8), >($7, 10), <($5, 3000))]," + " joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 9); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `r.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `r.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO` AND `EMP`.`DEPTNO` >" - + " 10 AND `EMP`.`SAL` < 3000"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO` AND `EMP`.`DEPTNO` > 10 AND" + + " `EMP`.`SAL` < 3000"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -144,16 +159,19 @@ public void testLeftJoin() { + " COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[left])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `d.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `d.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "LEFT JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`"; + + "LEFT JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -165,17 +183,20 @@ public void testRightJoin() { "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + " COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[right])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 15); String expectedSparkSql = - "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `d.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + "SELECT `t`.`EMPNO`, `t`.`ENAME`, `t`.`JOB`, `t`.`MGR`, `t`.`HIREDATE`, `t`.`SAL`," + + " `t`.`COMM`, `t`.`DEPTNO`, `DEPT`.`DEPTNO` `d.DEPTNO`, `DEPT`.`DNAME`," + + " `DEPT`.`LOC`\n" + + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`\n" + "FROM `scott`.`EMP`\n" - + "RIGHT JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`"; + + "LIMIT 50000) `t`\n" + + "RIGHT JOIN `scott`.`DEPT` ON `t`.`DEPTNO` = `DEPT`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -232,16 +253,19 @@ public void testFullOuter() { + " COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[full])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 15); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `d.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `d.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "FULL JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`"; + + "FULL JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -254,16 +278,19 @@ public void testCrossJoin() { + " COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[true], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 56); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `d.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `d.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "CROSS JOIN `scott`.`DEPT`"; + + "CROSS JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -276,16 +303,19 @@ public void testCrossJoinWithJoinConditions() { + " COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `d.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `d.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -298,16 +328,19 @@ public void testNonEquiJoin() { + " COMM=[$6], DEPTNO=[$7], d.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[>($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 17); String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO` `d.DEPTNO`," - + " `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO` `d.DEPTNO`, `t`.`DNAME`," + + " `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` > `DEPT`.`DEPTNO`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` > `t`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -323,19 +356,25 @@ public void testMultipleTablesJoin() { + " SAL=[$5], COMM=[$6], DEPTNO=[$7], r.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT *\n" + "FROM (SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`," - + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO`" - + " `r.DEPTNO`, `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO`" + + " `r.DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`) `t`\n" - + "LEFT JOIN `scott`.`SALGRADE` ON `t`.`SAL` = `SALGRADE`.`HISAL`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`) `t0`\n" + + "LEFT JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "LIMIT 50000) `t1` ON `t0`.`SAL` = `t1`.`HISAL`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -351,19 +390,25 @@ public void testMultipleTablesJoinWithTableAliases() { + " SAL=[$5], COMM=[$6], DEPTNO=[$7], t2.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT *\n" + "FROM (SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`," - + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO`" - + " `t2.DEPTNO`, `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO`" + + " `t2.DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`) `t`\n" - + "LEFT JOIN `scott`.`SALGRADE` ON `t`.`SAL` = `SALGRADE`.`HISAL`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`) `t0`\n" + + "LEFT JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "LIMIT 50000) `t1` ON `t0`.`SAL` = `t1`.`HISAL`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -379,19 +424,25 @@ public void testMultipleTablesJoinWithTableNames() { + " SAL=[$5], COMM=[$6], DEPTNO=[$7], DEPT.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT *\n" + "FROM (SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`," - + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO`" - + " `DEPT.DEPTNO`, `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO`" + + " `DEPT.DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`) `t`\n" - + "LEFT JOIN `scott`.`SALGRADE` ON `t`.`SAL` = `SALGRADE`.`HISAL`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`) `t0`\n" + + "LEFT JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "LIMIT 50000) `t1` ON `t0`.`SAL` = `t1`.`HISAL`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -407,19 +458,25 @@ public void testMultipleJoinWithPartSideAliases() { + " SAL=[$5], COMM=[$6], DEPTNO=[$7], t2.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = "SELECT *\n" + "FROM (SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`," - + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO`" - + " `t2.DEPTNO`, `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO`" + + " `t2.DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`) `t`\n" - + "LEFT JOIN `scott`.`SALGRADE` ON `t`.`SAL` = `SALGRADE`.`HISAL`"; + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`) `t0`\n" + + "LEFT JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "LIMIT 50000) `t1` ON `t0`.`SAL` = `t1`.`HISAL`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -438,21 +495,31 @@ public void testMultipleJoinWithSelfJoin() { + " SAL=[$5], COMM=[$6], DEPTNO=[$7], t2.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 70); String expectedSparkSql = - "SELECT `t`.`ENAME`, `t`.`DNAME`, `SALGRADE`.`GRADE`, `EMP0`.`EMPNO` `t4.EMPNO`\n" + "SELECT `t0`.`ENAME`, `t0`.`DNAME`, `t1`.`GRADE`, `t2`.`EMPNO` `t4.EMPNO`\n" + "FROM (SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`," - + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DEPTNO`" - + " `t2.DEPTNO`, `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`HIREDATE`, `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DEPTNO`" + + " `t2.DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + + "FROM `scott`.`EMP`\n" + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`) `t0`\n" + + "LEFT JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "LIMIT 50000) `t1` ON `t0`.`SAL` = `t1`.`HISAL`\n" + + "INNER JOIN (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`," + + " `DEPTNO`\n" + "FROM `scott`.`EMP`\n" - + "INNER JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`) `t`\n" - + "LEFT JOIN `scott`.`SALGRADE` ON `t`.`SAL` = `SALGRADE`.`HISAL`\n" - + "INNER JOIN `scott`.`EMP` `EMP0` ON `t`.`DEPTNO` = `EMP0`.`DEPTNO`"; + + "LIMIT 50000) `t2` ON `t0`.`DEPTNO` = `t2`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -481,23 +548,25 @@ public void testJoinWithRelationSubquery() { + " LogicalProject(JOB=[$2], MGR=[$3])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalSort(sort0=[$0], dir0=[DESC], fetch=[10])\n" - + " LogicalProject(DEPTNO=[$0], DNAME=[$1])\n" - + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC], fetch=[10])\n" + + " LogicalProject(DEPTNO=[$0], DNAME=[$1])\n" + + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedResult = "cnt=4; JOB=SALESMAN\ncnt=1; JOB=CLERK\ncnt=1; JOB=MANAGER\n"; verifyResult(root, expectedResult); String expectedSparkSql = - "" - + "SELECT COUNT(`EMP`.`MGR`) `cnt`, `EMP`.`JOB`\n" + "SELECT COUNT(`EMP`.`MGR`) `cnt`, `EMP`.`JOB`\n" + "FROM `scott`.`EMP`\n" + "INNER JOIN (SELECT `DEPTNO`, `DNAME`\n" + + "FROM (SELECT `DEPTNO`, `DNAME`\n" + "FROM `scott`.`DEPT`\n" + "WHERE `DEPTNO` > 10 AND `LOC` = 'CHICAGO'\n" + "ORDER BY `DEPTNO` DESC NULLS FIRST\n" - + "LIMIT 10) `t1` ON `EMP`.`DEPTNO` = `t1`.`DEPTNO`\n" + + "LIMIT 10) `t1`\n" + + "LIMIT 50000) `t2` ON `EMP`.`DEPTNO` = `t2`.`DEPTNO`\n" + "GROUP BY `EMP`.`JOB`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -537,39 +606,45 @@ public void testMultipleJoinsWithRelationSubquery() { + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalSort(fetch=[10])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalFilter(condition=[=($1, 'SALESMAN':VARCHAR)])\n" - + " LogicalTableScan(table=[[scott, BONUS]])\n" - + " LogicalSort(sort0=[$0], dir0=[DESC])\n" - + " LogicalFilter(condition=[<=($1, 1500)])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalFilter(condition=[=($1, 'SALESMAN':VARCHAR)])\n" + + " LogicalTableScan(table=[[scott, BONUS]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC])\n" + + " LogicalFilter(condition=[<=($1, 1500)])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 15); String expectedSparkSql = "SELECT *\n" - + "FROM (SELECT `t1`.`EMPNO`, `t1`.`ENAME`, `t1`.`JOB`, `t1`.`MGR`, `t1`.`HIREDATE`," - + " `t1`.`SAL`, `t1`.`COMM`, `t1`.`DEPTNO`, `t1`.`r.DEPTNO`, `t1`.`DNAME`, `t1`.`LOC`," - + " `t2`.`ENAME` `r.ENAME`, `t2`.`JOB` `r.JOB`, `t2`.`SAL` `r.SAL`, `t2`.`COMM`" + + "FROM (SELECT `t2`.`EMPNO`, `t2`.`ENAME`, `t2`.`JOB`, `t2`.`MGR`, `t2`.`HIREDATE`," + + " `t2`.`SAL`, `t2`.`COMM`, `t2`.`DEPTNO`, `t2`.`r.DEPTNO`, `t2`.`DNAME`, `t2`.`LOC`," + + " `t4`.`ENAME` `r.ENAME`, `t4`.`JOB` `r.JOB`, `t4`.`SAL` `r.SAL`, `t4`.`COMM`" + " `r.COMM`\n" + "FROM (SELECT `t`.`EMPNO`, `t`.`ENAME`, `t`.`JOB`, `t`.`MGR`, `t`.`HIREDATE`," - + " `t`.`SAL`, `t`.`COMM`, `t`.`DEPTNO`, `t0`.`DEPTNO` `r.DEPTNO`, `t0`.`DNAME`," - + " `t0`.`LOC`\n" + + " `t`.`SAL`, `t`.`COMM`, `t`.`DEPTNO`, `t1`.`DEPTNO` `r.DEPTNO`, `t1`.`DNAME`," + + " `t1`.`LOC`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 10) `t`\n" - + "INNER JOIN (SELECT *\n" + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`\n" - + "WHERE `DEPTNO` > 10 AND `LOC` = 'CHICAGO') `t0` ON `t`.`DEPTNO` = `t0`.`DEPTNO`)" - + " `t1`\n" - + "LEFT JOIN (SELECT *\n" + + "WHERE `DEPTNO` > 10 AND `LOC` = 'CHICAGO'\n" + + "LIMIT 50000) `t1` ON `t`.`DEPTNO` = `t1`.`DEPTNO`) `t2`\n" + + "LEFT JOIN (SELECT `ENAME`, `JOB`, `SAL`, `COMM`\n" + "FROM `scott`.`BONUS`\n" - + "WHERE `JOB` = 'SALESMAN') `t2` ON `t1`.`JOB` = `t2`.`JOB`) `t3`\n" + + "WHERE `JOB` = 'SALESMAN'\n" + + "LIMIT 50000) `t4` ON `t2`.`JOB` = `t4`.`JOB`) `t5`\n" + "CROSS JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + "FROM `scott`.`SALGRADE`\n" + "WHERE `LOSAL` <= 1500\n" - + "ORDER BY `GRADE` DESC NULLS FIRST) `t5`"; + + "ORDER BY `GRADE` DESC NULLS FIRST) `t7`\n" + + "LIMIT 50000) `t8`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -602,48 +677,53 @@ public void testMultipleJoinsWithRelationSubqueryWithAlias() { "LogicalJoin(condition=[true], joinType=[inner])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], DEPT.DEPTNO=[$8], DNAME=[$9], LOC=[$10]," - + " BONUS.ENAME=[$11], BONUS.JOB=[$12], BONUS.SAL=[$13]," - + " BONUS.COMM=[$14])\n" + + " BONUS.ENAME=[$11], BONUS.JOB=[$12], BONUS.SAL=[$13], BONUS.COMM=[$14])\n" + " LogicalJoin(condition=[=($2, $12)], joinType=[left])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], DEPT.DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalSort(fetch=[10])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalFilter(condition=[=($1, 'SALESMAN':VARCHAR)])\n" - + " LogicalTableScan(table=[[scott, BONUS]])\n" - + " LogicalSort(sort0=[$0], dir0=[DESC])\n" - + " LogicalFilter(condition=[<=($1, 1500)])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalFilter(condition=[=($1, 'SALESMAN':VARCHAR)])\n" + + " LogicalTableScan(table=[[scott, BONUS]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC])\n" + + " LogicalFilter(condition=[<=($1, 1500)])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 15); String expectedSparkSql = "SELECT *\n" - + "FROM (SELECT `t1`.`EMPNO`, `t1`.`ENAME`, `t1`.`JOB`, `t1`.`MGR`, `t1`.`HIREDATE`," - + " `t1`.`SAL`, `t1`.`COMM`, `t1`.`DEPTNO`, `t1`.`DEPT.DEPTNO`, `t1`.`DNAME`," - + " `t1`.`LOC`, `t2`.`ENAME` `BONUS.ENAME`, `t2`.`JOB` `BONUS.JOB`," - + " `t2`.`SAL` `BONUS.SAL`, `t2`.`COMM` `BONUS.COMM`\n" + + "FROM (SELECT `t2`.`EMPNO`, `t2`.`ENAME`, `t2`.`JOB`, `t2`.`MGR`, `t2`.`HIREDATE`," + + " `t2`.`SAL`, `t2`.`COMM`, `t2`.`DEPTNO`, `t2`.`DEPT.DEPTNO`, `t2`.`DNAME`," + + " `t2`.`LOC`, `t4`.`ENAME` `BONUS.ENAME`, `t4`.`JOB` `BONUS.JOB`, `t4`.`SAL`" + + " `BONUS.SAL`, `t4`.`COMM` `BONUS.COMM`\n" + "FROM (SELECT `t`.`EMPNO`, `t`.`ENAME`, `t`.`JOB`, `t`.`MGR`, `t`.`HIREDATE`," - + " `t`.`SAL`, `t`.`COMM`, `t`.`DEPTNO`, `t0`.`DEPTNO` `DEPT.DEPTNO`," - + " `t0`.`DNAME`, `t0`.`LOC`\n" + + " `t`.`SAL`, `t`.`COMM`, `t`.`DEPTNO`, `t1`.`DEPTNO` `DEPT.DEPTNO`, `t1`.`DNAME`," + + " `t1`.`LOC`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 10) `t`\n" - + "INNER JOIN (SELECT *\n" + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`\n" - + "WHERE `DEPTNO` > 10 AND `LOC` = 'CHICAGO') `t0` ON `t`.`DEPTNO` = `t0`.`DEPTNO`)" - + " `t1`\n" - + "LEFT JOIN (SELECT *\n" + + "WHERE `DEPTNO` > 10 AND `LOC` = 'CHICAGO'\n" + + "LIMIT 50000) `t1` ON `t`.`DEPTNO` = `t1`.`DEPTNO`) `t2`\n" + + "LEFT JOIN (SELECT `ENAME`, `JOB`, `SAL`, `COMM`\n" + "FROM `scott`.`BONUS`\n" - + "WHERE `JOB` = 'SALESMAN') `t2` ON `t1`.`JOB` = `t2`.`JOB`) `t3`\n" + + "WHERE `JOB` = 'SALESMAN'\n" + + "LIMIT 50000) `t4` ON `t2`.`JOB` = `t4`.`JOB`) `t5`\n" + "CROSS JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + "FROM `scott`.`SALGRADE`\n" + "WHERE `LOSAL` <= 1500\n" - + "ORDER BY `GRADE` DESC NULLS FIRST) `t5`"; + + "ORDER BY `GRADE` DESC NULLS FIRST) `t7`\n" + + "LIMIT 50000) `t8`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -682,40 +762,46 @@ public void testMultipleJoinsWithRelationSubqueryWithAlias2() { + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + " LogicalSort(fetch=[10])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n" - + " LogicalFilter(condition=[=($1, 'SALESMAN':VARCHAR)])\n" - + " LogicalTableScan(table=[[scott, BONUS]])\n" - + " LogicalSort(sort0=[$0], dir0=[DESC])\n" - + " LogicalFilter(condition=[<=($1, 1500)])\n" - + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalFilter(condition=[AND(>($0, 10), =($2, 'CHICAGO':VARCHAR))])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalFilter(condition=[=($1, 'SALESMAN':VARCHAR)])\n" + + " LogicalTableScan(table=[[scott, BONUS]])\n" + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC])\n" + + " LogicalFilter(condition=[<=($1, 1500)])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 15); String expectedSparkSql = "SELECT *\n" - + "FROM (SELECT `t1`.`EMPNO`, `t1`.`ENAME`, `t1`.`JOB`, `t1`.`MGR`, `t1`.`HIREDATE`," - + " `t1`.`SAL`, `t1`.`COMM`, `t1`.`DEPTNO`, `t1`.`r.DEPTNO`, `t1`.`DNAME`, `t1`.`LOC`," - + " `t2`.`ENAME` `r.ENAME`, `t2`.`JOB` `r.JOB`, `t2`.`SAL` `r.SAL`, `t2`.`COMM`" + + "FROM (SELECT `t2`.`EMPNO`, `t2`.`ENAME`, `t2`.`JOB`, `t2`.`MGR`, `t2`.`HIREDATE`," + + " `t2`.`SAL`, `t2`.`COMM`, `t2`.`DEPTNO`, `t2`.`r.DEPTNO`, `t2`.`DNAME`, `t2`.`LOC`," + + " `t4`.`ENAME` `r.ENAME`, `t4`.`JOB` `r.JOB`, `t4`.`SAL` `r.SAL`, `t4`.`COMM`" + " `r.COMM`\n" + "FROM (SELECT `t`.`EMPNO`, `t`.`ENAME`, `t`.`JOB`, `t`.`MGR`, `t`.`HIREDATE`," - + " `t`.`SAL`, `t`.`COMM`, `t`.`DEPTNO`, `t0`.`DEPTNO` `r.DEPTNO`, `t0`.`DNAME`," - + " `t0`.`LOC`\n" + + " `t`.`SAL`, `t`.`COMM`, `t`.`DEPTNO`, `t1`.`DEPTNO` `r.DEPTNO`, `t1`.`DNAME`," + + " `t1`.`LOC`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 10) `t`\n" - + "INNER JOIN (SELECT *\n" + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`\n" - + "WHERE `DEPTNO` > 10 AND `LOC` = 'CHICAGO') `t0` ON `t`.`DEPTNO` = `t0`.`DEPTNO`)" - + " `t1`\n" - + "LEFT JOIN (SELECT *\n" + + "WHERE `DEPTNO` > 10 AND `LOC` = 'CHICAGO'\n" + + "LIMIT 50000) `t1` ON `t`.`DEPTNO` = `t1`.`DEPTNO`) `t2`\n" + + "LEFT JOIN (SELECT `ENAME`, `JOB`, `SAL`, `COMM`\n" + "FROM `scott`.`BONUS`\n" - + "WHERE `JOB` = 'SALESMAN') `t2` ON `t1`.`JOB` = `t2`.`JOB`) `t3`\n" + + "WHERE `JOB` = 'SALESMAN'\n" + + "LIMIT 50000) `t4` ON `t2`.`JOB` = `t4`.`JOB`) `t5`\n" + "CROSS JOIN (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + "FROM `scott`.`SALGRADE`\n" + "WHERE `LOSAL` <= 1500\n" - + "ORDER BY `GRADE` DESC NULLS FIRST) `t5`"; + + "ORDER BY `GRADE` DESC NULLS FIRST) `t7`\n" + + "LIMIT 50000) `t8`"; verifyPPLToSparkSQL(root, expectedSparkSql); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLLookupTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLLookupTest.java index 192170e0407..6a94dc07309 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLLookupTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLLookupTest.java @@ -25,8 +25,9 @@ public void testReplace() { + " COMM=[$6], DEPTNO=[$7], LOC=[$8])\n" + " LogicalJoin(condition=[=($7, $9)], joinType=[left])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedResult = @@ -62,10 +63,11 @@ public void testReplace() { String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t0`.`LOC`\n" + "FROM `scott`.`EMP`\n" + "LEFT JOIN (SELECT `LOC`, `DEPTNO`\n" - + "FROM `scott`.`DEPT`) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t0` ON `EMP`.`DEPTNO` = `t0`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -78,8 +80,9 @@ public void testReplaceAs() { + " DEPTNO=[$7], JOB=[$8])\n" + " LogicalJoin(condition=[=($7, $9)], joinType=[left])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedResult = @@ -115,10 +118,11 @@ public void testReplaceAs() { String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`MGR`, `EMP`.`HIREDATE`, `EMP`.`SAL`," - + " `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`LOC` `JOB`\n" + + " `EMP`.`COMM`, `EMP`.`DEPTNO`, `t0`.`LOC` `JOB`\n" + "FROM `scott`.`EMP`\n" + "LEFT JOIN (SELECT `LOC`, `DEPTNO`\n" - + "FROM `scott`.`DEPT`) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t0` ON `EMP`.`DEPTNO` = `t0`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -147,8 +151,9 @@ public void testAppend() { + " COMM=[$6], DEPTNO=[$7], LOC=[$8])\n" + " LogicalJoin(condition=[=($7, $9)], joinType=[left])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedResult = @@ -184,10 +189,11 @@ public void testAppend() { String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t0`.`LOC`\n" + "FROM `scott`.`EMP`\n" + "LEFT JOIN (SELECT `LOC`, `DEPTNO`\n" - + "FROM `scott`.`DEPT`) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t0` ON `EMP`.`DEPTNO` = `t0`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -200,8 +206,9 @@ public void testAppendAs() { + " DEPTNO=[$7], JOB=[COALESCE($2, $8)])\n" + " LogicalJoin(condition=[=($7, $9)], joinType=[left])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalProject(LOC=[$2], DEPTNO=[$0])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedResult = @@ -237,10 +244,11 @@ public void testAppendAs() { String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`MGR`, `EMP`.`HIREDATE`, `EMP`.`SAL`," - + " `EMP`.`COMM`, `EMP`.`DEPTNO`, COALESCE(`EMP`.`JOB`, `t`.`LOC`) `JOB`\n" + + " `EMP`.`COMM`, `EMP`.`DEPTNO`, COALESCE(`EMP`.`JOB`, `t0`.`LOC`) `JOB`\n" + "FROM `scott`.`EMP`\n" + "LEFT JOIN (SELECT `LOC`, `DEPTNO`\n" - + "FROM `scott`.`DEPT`) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t0` ON `EMP`.`DEPTNO` = `t0`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -266,7 +274,8 @@ public void testLookupAll() { + " COMM=[$6], DEPTNO=[$7], DNAME=[$9], LOC=[$10])\n" + " LogicalJoin(condition=[=($7, $8)], joinType=[left])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalTableScan(table=[[scott, DEPT]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedResult = @@ -302,9 +311,11 @@ public void testLookupAll() { String expectedSparkSql = "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," - + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `DEPT`.`DNAME`, `DEPT`.`LOC`\n" + + " `EMP`.`SAL`, `EMP`.`COMM`, `EMP`.`DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + "FROM `scott`.`EMP`\n" - + "LEFT JOIN `scott`.`DEPT` ON `EMP`.`DEPTNO` = `DEPT`.`DEPTNO`"; + + "LEFT JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "LIMIT 50000) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java index c7bd64d5cab..f3d193d1f64 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java @@ -210,9 +210,10 @@ public void testPatternsAggregationModeForBrainMethod() { + " LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n" + " LogicalProject(ENAME=[$1], $f8=[10], $f9=[100000])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " Uncollect\n" - + " LogicalProject(patterns_field=[$cor0.patterns_field])\n" - + " LogicalValues(tuples=[[{ 0 }]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " Uncollect\n" + + " LogicalProject(patterns_field=[$cor0.patterns_field])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; verifyLogical(root, expectedLogical); /* @@ -220,14 +221,16 @@ public void testPatternsAggregationModeForBrainMethod() { * Spark doesn't have SAFE_CAST and UNNEST */ String expectedSparkSql = - "SELECT SAFE_CAST(`t20`.`patterns_field`['pattern'] AS STRING) `patterns_field`," - + " SAFE_CAST(`t20`.`patterns_field`['pattern_count'] AS BIGINT) `pattern_count`," - + " SAFE_CAST(`t20`.`patterns_field`['tokens'] AS MAP< VARCHAR, VARCHAR ARRAY >)" + "SELECT SAFE_CAST(`t3`.`patterns_field`['pattern'] AS STRING) `patterns_field`," + + " SAFE_CAST(`t3`.`patterns_field`['pattern_count'] AS BIGINT) `pattern_count`," + + " SAFE_CAST(`t3`.`patterns_field`['tokens'] AS MAP< VARCHAR, VARCHAR ARRAY >)" + " `tokens`\n" + "FROM (SELECT `pattern`(`ENAME`, 10, 100000) `patterns_field`\n" + "FROM `scott`.`EMP`) `$cor0`,\n" - + "LATERAL UNNEST (SELECT `$cor0`.`patterns_field`\n" - + "FROM (VALUES (0)) `t` (`ZERO`)) `t2` (`patterns_field`) `t20`"; + + "LATERAL (SELECT `patterns_field`\n" + + "FROM UNNEST (SELECT `$cor0`.`patterns_field`\n" + + "FROM (VALUES (0)) `t` (`ZERO`)) `t2` (`patterns_field`)\n" + + "LIMIT 50000) `t3`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -244,9 +247,10 @@ public void testPatternsAggregationModeWithGroupByForBrainMethod() { + " LogicalAggregate(group=[{1}], patterns_field=[pattern($0, $2, $3)])\n" + " LogicalProject(ENAME=[$1], DEPTNO=[$7], $f8=[10], $f9=[100000])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " Uncollect\n" - + " LogicalProject(patterns_field=[$cor0.patterns_field])\n" - + " LogicalValues(tuples=[[{ 0 }]])\n"; + + " LogicalSystemLimit(fetch=[50000])\n" + + " Uncollect\n" + + " LogicalProject(patterns_field=[$cor0.patterns_field])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; verifyLogical(root, expectedLogical); /* @@ -254,15 +258,17 @@ public void testPatternsAggregationModeWithGroupByForBrainMethod() { * Spark doesn't have SAFE_CAST and UNNEST */ String expectedSparkSql = - "SELECT `$cor0`.`DEPTNO`, SAFE_CAST(`t20`.`patterns_field`['pattern'] AS STRING)" - + " `patterns_field`, SAFE_CAST(`t20`.`patterns_field`['pattern_count'] AS BIGINT)" - + " `pattern_count`, SAFE_CAST(`t20`.`patterns_field`['tokens'] AS MAP< VARCHAR," - + " VARCHAR ARRAY >) `tokens`\n" + "SELECT `$cor0`.`DEPTNO`, SAFE_CAST(`t3`.`patterns_field`['pattern'] AS STRING)" + + " `patterns_field`, SAFE_CAST(`t3`.`patterns_field`['pattern_count'] AS BIGINT)" + + " `pattern_count`, SAFE_CAST(`t3`.`patterns_field`['tokens'] AS MAP< VARCHAR, VARCHAR" + + " ARRAY >) `tokens`\n" + "FROM (SELECT `DEPTNO`, `pattern`(`ENAME`, 10, 100000) `patterns_field`\n" + "FROM `scott`.`EMP`\n" + "GROUP BY `DEPTNO`) `$cor0`,\n" - + "LATERAL UNNEST (SELECT `$cor0`.`patterns_field`\n" - + "FROM (VALUES (0)) `t` (`ZERO`)) `t2` (`patterns_field`) `t20`"; + + "LATERAL (SELECT `patterns_field`\n" + + "FROM UNNEST (SELECT `$cor0`.`patterns_field`\n" + + "FROM (VALUES (0)) `t` (`ZERO`)) `t2` (`patterns_field`)\n" + + "LIMIT 50000) `t3`"; verifyPPLToSparkSQL(root, expectedSparkSql); } }