From 12bc91ef6e05fbd3b018f546c87b4c723f382d63 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Thu, 25 Sep 2025 13:43:27 +0800 Subject: [PATCH 1/2] The limit pushdown check logic should be in action building instead of waiting until action is executed Signed-off-by: Lantao Jin --- .../resources/rest-api-spec/test/issues/3102.yml | 13 +++++++++++++ .../storage/scan/AbstractCalciteIndexScan.java | 2 ++ .../storage/scan/CalciteLogicalIndexScan.java | 8 ++++++++ 3 files changed, 23 insertions(+) diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml index ffd8d5510f1..532f4161b5b 100644 --- a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml @@ -3,6 +3,11 @@ setup: features: - headers - allowed_warnings + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: true - do: indices.create: index: test @@ -21,6 +26,14 @@ setup: - '{"index": {}}' - '{"id": 3}' +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + --- "Prevent push down limit if the offset reach max_result_window": - do: diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 0173454f026..a6189a8f4d8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -170,6 +170,7 @@ public static class PushDownContext extends ArrayDeque { @Getter private AggPushDownAction aggPushDownAction; @Getter private boolean isLimitPushed = false; @Getter private boolean isProjectPushed = false; + @Getter private int startFrom = 0; @Override public PushDownContext clone() { @@ -184,6 +185,7 @@ public boolean add(PushDownAction pushDownAction) { } if (pushDownAction.type == PushDownType.LIMIT) { isLimitPushed = true; + startFrom += ((LimitDigest) pushDownAction.digest).offset(); } if (pushDownAction.type == PushDownType.PROJECT) { isProjectPushed = true; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index 84cead5d2b8..26b0e5d5b7a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -45,6 +45,7 @@ import org.opensearch.sql.opensearch.planner.physical.EnumerableIndexScanRule; import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; import org.opensearch.sql.opensearch.request.AggregateAnalyzer; +import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; import org.opensearch.sql.opensearch.request.PredicateAnalyzer.QueryExpression; import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; @@ -323,6 +324,13 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of return offset > 0 ? sort.copy(sort.getTraitSet(), List.of(newScan)) : newScan; } else { CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType()); + int newStartFrom = newScan.pushDownContext.getStartFrom() + offset; + if (newStartFrom >= newScan.osIndex.getMaxResultWindow()) { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + String.format( + "Requested offset %d should be less than the max result window %d", + newStartFrom, newScan.osIndex.getMaxResultWindow())); + } newScan.pushDownContext.add( PushDownAction.of( PushDownType.LIMIT, From 103f7ef40de0b214a81fa50dece1c25935fcd644 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Thu, 25 Sep 2025 13:56:42 +0800 Subject: [PATCH 2/2] Add an explain IT Signed-off-by: Lantao Jin --- .../sql/calcite/remote/CalciteExplainIT.java | 11 +++++++++++ .../calcite/explain_prevent_limit_push.yaml | 10 ++++++++++ 2 files changed, 21 insertions(+) create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_prevent_limit_push.yaml diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index e5d9304255f..0f921c173aa 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -578,6 +578,17 @@ public void testMvjoinExplain() throws IOException { assertJsonEqualsIgnoreId(expected, result); } + @Test + public void testPreventLimitPushdown() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + setMaxResultWindow("opensearch-sql_test_index_account", 1); + String query = "source=opensearch-sql_test_index_account | head 1 from 1"; + var result = explainQueryToString(query); + String expected = loadExpectedPlan("explain_prevent_limit_push.yaml"); + assertYamlEqualsJsonIgnoreId(expected, result); + resetMaxResultWindow("opensearch-sql_test_index_account"); + } + @Test public void testPushdownLimitIntoAggregation() throws IOException { enabledOnlyWhenPushdownIsEnabled(); diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_prevent_limit_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_prevent_limit_push.yaml new file mode 100644 index 00000000000..e7019b44d7d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_prevent_limit_push.yaml @@ -0,0 +1,10 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10]) + LogicalSort(offset=[1], fetch=[1]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableLimit(offset=[1], fetch=[1]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file