diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 9f3b67b4ae7..fef38ea5e32 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -51,6 +51,7 @@ import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.Eval; +import org.opensearch.sql.ast.tree.Expand; import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; @@ -624,6 +625,12 @@ public LogicalPlan visitML(ML node, AnalysisContext context) { return new LogicalML(child, node.getArguments()); } + @Override + public LogicalPlan visitExpand(Expand expand, AnalysisContext context) { + throw new UnsupportedOperationException( + "Expand is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true"); + } + /** Build {@link LogicalTrendline} for Trendline command. */ @Override public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) { diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index a8b3648be62..9267046665e 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -49,6 +49,7 @@ import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.Eval; +import org.opensearch.sql.ast.tree.Expand; import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; @@ -113,6 +114,10 @@ public T visitRelationSubquery(RelationSubquery node, C context) { return visitChildren(node, context); } + public T visitExpand(Expand expand, C context) { + return visitChildren(expand, context); + } + public T visitTableFunction(TableFunction node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index 9c3d5a0bd59..4c7736b7f24 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -53,6 +53,7 @@ import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; +import org.opensearch.sql.ast.tree.Expand; import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; @@ -118,6 +119,10 @@ public static Eval eval(UnresolvedPlan input, Let... projectList) { return new Eval(Arrays.asList(projectList)).attach(input); } + public Expand expand(UnresolvedPlan input, Field field, String alias) { + return new Expand(field, alias).attach(input); + } + public static UnresolvedPlan projectWithArg( UnresolvedPlan input, List argList, UnresolvedExpression... projectList) { return new Project(Arrays.asList(projectList), argList).attach(input); diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Expand.java b/core/src/main/java/org/opensearch/sql/ast/tree/Expand.java new file mode 100644 index 00000000000..02e8402aaf2 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Expand.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Field; + +/** AST node representing an {@code expand } operation. */ +@ToString +@RequiredArgsConstructor +@EqualsAndHashCode(callSuper = false) +public class Expand extends UnresolvedPlan { + + private UnresolvedPlan child; + @Getter private final Field field; + @Getter @Nullable private final String alias; + + @Override + public Expand attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitExpand(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index c06965527f8..c222b59561e 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -32,6 +32,7 @@ import org.apache.calcite.plan.ViewExpanders; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; @@ -67,6 +68,7 @@ import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.Eval; +import org.opensearch.sql.ast.tree.Expand; import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; @@ -196,7 +198,7 @@ public RelNode visitProject(Project node, CalcitePlanContext context) { } /** See logic in {@link org.opensearch.sql.analysis.symbol.SymbolTable#lookupAllFields} */ - private void tryToRemoveNestedFields(CalcitePlanContext context) { + private static void tryToRemoveNestedFields(CalcitePlanContext context) { Set allFields = new HashSet<>(context.relBuilder.peek().getRowType().getFieldNames()); List duplicatedNestedFields = allFields.stream() @@ -1100,4 +1102,72 @@ private RexNode buildWmaRexNode( return context.relBuilder.call( SqlStdOperatorTable.DIVIDE, divider, context.relBuilder.cast(divisor, SqlTypeName.DOUBLE)); } + + /** + * Expand command visitor to handle array field expansion. 1. Unnest 2. Join with the original + * table to get all fields + * + *

S = π_{field, other_fields}(R ⨝ UNNEST_field(R)) + * + * @param expand Expand command to be visited + * @param context CalcitePlanContext containing the RelBuilder and other context + * @return RelNode representing records with the expanded array field + */ + @Override + public RelNode visitExpand(Expand expand, CalcitePlanContext context) { + // 1. Visit Children + visitChildren(expand, context); + + RelBuilder relBuilder = context.relBuilder; + + // 2. Get the field to expand and an optional alias. + Field arrayField = expand.getField(); + RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context); + String alias = expand.getAlias(); + + // 3. Capture the outer row in a CorrelationId + Holder correlVariable = Holder.empty(); + relBuilder.variable(correlVariable::set); + + // 4. Push a copy of the original table to the RelBuilder stack as right + // side of the correlate (join). + relBuilder.push(relBuilder.peek()); + RexNode correlArrayField = + relBuilder.field( + context.rexBuilder.makeCorrel(relBuilder.peek().getRowType(), correlVariable.get().id), + arrayFieldRex.getIndex()); + + // 5. Filter rows where the array field is the same as the left side + // TODO: This is not a standard way to use correlate and uncollect together. + // A filter should not be necessary. Correct it in the future. + RexNode filterCondition = relBuilder.equals(correlArrayField, arrayFieldRex); + relBuilder.filter(filterCondition); + + // 6. Project only the array field for the uncollect operation + relBuilder.project(List.of(correlArrayField), List.of(arrayField.getField().toString())); + + // 7. Expand the array field using uncollect + relBuilder.uncollect(List.of(), false); + + // 8. Perform a nested-loop join (correlate) between the original table and the expanded + // array field. + // The last parameter has to refer to the array to be expanded on the left side. It will + // be used by the right side to correlate with the left side. + // Using left join to keep the records where the array field is empty. The corresponding + // field in the result will be null. + relBuilder.correlate(JoinRelType.LEFT, correlVariable.get().id, List.of(arrayFieldRex)); + + // 9. Remove the original array field from the output. + // TODO: RFC: should we keep the original array field when alias is present? + relBuilder.projectExcept(arrayFieldRex); + if (alias != null) { + // Sub-nested fields cannot be removed after renaming the nested field. + tryToRemoveNestedFields(context); + RexInputRef expandedField = relBuilder.field(arrayField.getField().toString()); + List names = new ArrayList<>(relBuilder.peek().getRowType().getFieldNames()); + names.set(expandedField.getIndex(), alias); + relBuilder.rename(names); + } + return relBuilder.peek(); + } } diff --git a/docs/user/ppl/cmd/expand.rst b/docs/user/ppl/cmd/expand.rst new file mode 100644 index 00000000000..46cf6a37e18 --- /dev/null +++ b/docs/user/ppl/cmd/expand.rst @@ -0,0 +1,83 @@ +============= +expand +============= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ + (From 3.1.0) + +Use the ``expand`` command on a nested array field to transform a single +document into multiple documents—each containing one element from the array. +All other fields in the original document are duplicated across the resulting +documents. + +The expand command generates one row per element in the specified array field: + +* The specified array field is converted into individual rows. +* If an alias is provided, the expanded values appear under the alias instead + of the original field name. +* If the specified field is an empty array, the row is retained with the + expanded field set to null. + +Version +======= +Since 3.1.0 + +Syntax +====== + +expand [as alias] + +* field: The field to be expanded (exploded). Currently only nested arrays are + supported. +* alias: (Optional) The name to use instead of the original field name. + + +Example: expand address field with an alias +=========================================== + +Given a dataset ``migration`` with the following data: + +.. code-block:: + + {"name":"abbas","age":24,"address":[{"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"}}]} + {"name":"chen","age":32,"address":[{"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"}},{"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"}}]} + +The following query expand the address field and rename it to addr: + +PPL query:: + + PPL> source=migration | expand address as addr; + fetched rows / total rows = 3/3 + +-------+-----+-------------------------------------------------------------------------------------------+ + | name | age | addr | + |-------+-----+-------------------------------------------------------------------------------------------| + | abbas | 24 | {"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"}} | + | chen | 32 | {"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"}} | + | chen | 32 | {"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"}} | + +-------+-----+-------------------------------------------------------------------------------------------+ + +Limitations +============ + +* The ``expand`` command currently only supports nested arrays. Primitive + fields storing arrays are not supported. E.g. a string field storing an array + of strings cannot be expanded with the current implementation. +* The command works only with Calcite enabled. This can be set with the + following command: + + .. code-block:: + + PUT /_cluster/settings + { + "persistent":{ + "plugins.calcite.enabled": true + } + } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java new file mode 100644 index 00000000000..3788f8fc852 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java @@ -0,0 +1,338 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ARRAY; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Ignore; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteExpandCommandIT extends PPLIntegTestCase { + @Override + public void init() throws Exception { + super.init(); + loadIndex(Index.NESTED_SIMPLE); + loadIndex(Index.ARRAY); + enableCalcite(); + disallowCalciteFallback(); + } + + @Test + public void testExpandOnNested() throws Exception { + JSONObject response = + executeQuery(String.format("source=%s | expand address", TEST_INDEX_NESTED_SIMPLE)); + verifySchema( + response, + schema("name", "string"), + schema("age", "bigint"), + schema("id", "bigint"), + schema("address", "struct")); + verifyDataRows( + response, + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "New york city") + .put("state", "NY") + .put("moveInDate", new JSONObject().put("dateAndTime", "1984-04-12 09:07:42"))), + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "bellevue") + .put("state", "WA") + .put( + "moveInDate", + new JSONArray() + .put(new JSONObject().put("dateAndTime", "2023-05-03 08:07:42")) + .put(new JSONObject().put("dateAndTime", "2001-11-11 04:07:44")))), + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "seattle") + .put("state", "WA") + .put("moveInDate", new JSONObject().put("dateAndTime", "1966-03-19 03:04:55"))), + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "chicago") + .put("state", "IL") + .put("moveInDate", new JSONObject().put("dateAndTime", "2011-06-01 01:01:42"))), + rows( + "chen", + null, + 32, + new JSONObject() + .put("city", "Miami") + .put("state", "Florida") + .put("moveInDate", new JSONObject().put("dateAndTime", "1901-08-11 04:03:33"))), + rows( + "chen", + null, + 32, + new JSONObject() + .put("city", "los angeles") + .put("state", "CA") + .put("moveInDate", new JSONObject().put("dateAndTime", "2023-05-03 08:07:42"))), + rows( + "peng", + null, + 26, + new JSONObject() + .put("city", "san diego") + .put("state", "CA") + .put("moveInDate", new JSONObject().put("dateAndTime", "2001-11-11 04:07:44"))), + rows( + "peng", + null, + 26, + new JSONObject() + .put("city", "austin") + .put("state", "TX") + .put("moveInDate", new JSONObject().put("dateAndTime", "1977-07-13 09:04:41"))), + rows( + "andy", + 4, + 19, + new JSONObject() + .put("city", "houston") + .put("state", "TX") + .put("moveInDate", new JSONObject().put("dateAndTime", "1933-12-12 05:05:45"))), + rows( + "david", + null, + 25, + new JSONObject() + .put("city", "raleigh") + .put("state", "NC") + .put("moveInDate", new JSONObject().put("dateAndTime", "1909-06-17 01:04:21"))), + rows( + "david", + null, + 25, + new JSONObject() + .put("city", "charlotte") + .put("state", "SC") + .put( + "moveInDate", + new JSONArray() + .put(new JSONObject().put("dateAndTime", "2001-11-11 04:07:44"))))); + } + + // To consider in future releases: will expand on array (instead of nested) be supported. + // In Opensearch, a string field can store either a single string or an array of strings. + // This makes it difficult to implement expand on arrays. + @Ignore + @Test + public void testExpandOnArray() throws Exception { + JSONObject response = + executeQuery(String.format("source=%s | expand strings", TEST_INDEX_ARRAY)); + verifySchema(response, schema("numbers", "array"), schema("strings", "string")); + verifyNumOfRows(response, 5); + } + + @Test + public void testExpandWithAlias() throws Exception { + JSONObject response = + executeQuery(String.format("source=%s | expand address as addr", TEST_INDEX_NESTED_SIMPLE)); + verifySchema( + response, + schema("name", "string"), + schema("age", "bigint"), + schema("id", "bigint"), + schema("addr", "struct")); + verifyDataRows( + response, + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "New york city") + .put("state", "NY") + .put("moveInDate", new JSONObject().put("dateAndTime", "1984-04-12 09:07:42"))), + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "bellevue") + .put("state", "WA") + .put( + "moveInDate", + new JSONArray() + .put(new JSONObject().put("dateAndTime", "2023-05-03 08:07:42")) + .put(new JSONObject().put("dateAndTime", "2001-11-11 04:07:44")))), + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "seattle") + .put("state", "WA") + .put("moveInDate", new JSONObject().put("dateAndTime", "1966-03-19 03:04:55"))), + rows( + "abbas", + null, + 24, + new JSONObject() + .put("city", "chicago") + .put("state", "IL") + .put("moveInDate", new JSONObject().put("dateAndTime", "2011-06-01 01:01:42"))), + rows( + "chen", + null, + 32, + new JSONObject() + .put("city", "Miami") + .put("state", "Florida") + .put("moveInDate", new JSONObject().put("dateAndTime", "1901-08-11 04:03:33"))), + rows( + "chen", + null, + 32, + new JSONObject() + .put("city", "los angeles") + .put("state", "CA") + .put("moveInDate", new JSONObject().put("dateAndTime", "2023-05-03 08:07:42"))), + rows( + "peng", + null, + 26, + new JSONObject() + .put("city", "san diego") + .put("state", "CA") + .put("moveInDate", new JSONObject().put("dateAndTime", "2001-11-11 04:07:44"))), + rows( + "peng", + null, + 26, + new JSONObject() + .put("city", "austin") + .put("state", "TX") + .put("moveInDate", new JSONObject().put("dateAndTime", "1977-07-13 09:04:41"))), + rows( + "andy", + 4, + 19, + new JSONObject() + .put("city", "houston") + .put("state", "TX") + .put("moveInDate", new JSONObject().put("dateAndTime", "1933-12-12 05:05:45"))), + rows( + "david", + null, + 25, + new JSONObject() + .put("city", "raleigh") + .put("state", "NC") + .put("moveInDate", new JSONObject().put("dateAndTime", "1909-06-17 01:04:21"))), + rows( + "david", + null, + 25, + new JSONObject() + .put("city", "charlotte") + .put("state", "SC") + .put( + "moveInDate", + new JSONArray() + .put(new JSONObject().put("dateAndTime", "2001-11-11 04:07:44"))))); + } + + @Test + public void testExpandWithEval() throws Exception { + JSONObject response = + executeQuery( + String.format("source=%s | eval addr=address | expand addr", TEST_INDEX_NESTED_SIMPLE)); + verifySchema( + response, + schema("name", "string"), + schema("age", "bigint"), + schema("address", "array"), + schema("id", "bigint"), + schema("addr", "struct")); + verifyNumOfRows(response, 11); + } + + @Test + public void testExpandEmptyArray() throws Exception { + final int docId = 6; + Request insertRequest = + new Request( + "PUT", String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_NESTED_SIMPLE, docId)); + insertRequest.setJsonEntity("{\"name\":\"ben\",\"age\":47, \"id\": 437821, \"address\":[]}\n"); + client().performRequest(insertRequest); + + JSONObject response = + executeQuery( + String.format( + "source=%s | where name='ben' | expand address", TEST_INDEX_NESTED_SIMPLE)); + verifySchema( + response, + schema("name", "string"), + schema("age", "bigint"), + schema("id", "bigint"), + // The type is inferred at runtime. When the array is empty and is the + // first element of the column, it is set to "undefined". + schema("address", "undefined")); + verifyDataRows(response, rows("ben", 437821, 47, null)); + verifyNumOfRows(response, 1); + + Request deleteRequest = + new Request( + "DELETE", String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_NESTED_SIMPLE, docId)); + client().performRequest(deleteRequest); + } + + @Test + public void testExpandOnNullField() throws Exception { + final int docId = 6; + Request insertRequest = + new Request( + "PUT", String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_NESTED_SIMPLE, docId)); + insertRequest.setJsonEntity( + "{\"name\":\"ben\",\"age\":47, \"id\": 437821, \"address\":null}\n"); + client().performRequest(insertRequest); + + JSONObject response = + executeQuery( + String.format( + "source=%s | where name='ben' | expand address", TEST_INDEX_NESTED_SIMPLE)); + verifySchema( + response, + schema("name", "string"), + schema("age", "bigint"), + schema("id", "bigint"), + schema("address", "undefined")); + verifyDataRows(response, rows("ben", 437821, 47, null)); + verifyNumOfRows(response, 1); + + Request deleteRequest = + new Request( + "DELETE", String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_NESTED_SIMPLE, docId)); + client().performRequest(deleteRequest); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 0ec038ba092..d248651935b 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -9,6 +9,7 @@ import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient; import static org.opensearch.sql.legacy.TestUtils.getAccountIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getAliasIndexMapping; +import static org.opensearch.sql.legacy.TestUtils.getArrayIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getBankIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getBankWithNullValuesIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getDataTypeNonnumericIndexMapping; @@ -898,7 +899,12 @@ public enum Index { "customer", "tpch", getTpchMappingFile("customer_index_mapping.json"), - "src/test/resources/tpch/data/customer.json"); + "src/test/resources/tpch/data/customer.json"), + ARRAY( + TestsConstants.TEST_INDEX_ARRAY, + "array", + getArrayIndexMapping(), + "src/test/resources/array.json"); private final String name; private final String type; diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index 5adca42e46c..b1f825726ae 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -295,6 +295,11 @@ public static String getDuplicationNullableIndexMapping() { return getMappingFile(mappingFile); } + public static String getArrayIndexMapping() { + String mappingFile = "array_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static void loadBulk(Client client, String jsonPath, String defaultIndex) throws Exception { System.out.println(String.format("Loading file %s into opensearch cluster", jsonPath)); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index 7b9538591e7..ef9c45c0453 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -77,6 +77,7 @@ public class TestsConstants { public static final String TEST_INDEX_MERGE_TEST_1 = TEST_INDEX + "_merge_test_1"; public static final String TEST_INDEX_MERGE_TEST_2 = TEST_INDEX + "_merge_test_2"; public static final String TEST_INDEX_MERGE_TEST_WILDCARD = TEST_INDEX + "_merge_test_*"; + public static final String TEST_INDEX_ARRAY = TEST_INDEX + "_array"; public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public static final String TS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; diff --git a/integ-test/src/test/resources/array.json b/integ-test/src/test/resources/array.json new file mode 100644 index 00000000000..70af5474786 --- /dev/null +++ b/integ-test/src/test/resources/array.json @@ -0,0 +1,4 @@ +{"index":{"_id":"1"}} +{"numbers":[1, 2],"strings": ["a", "b"]} +{"index":{"_id":"2"}} +{"numbers":[3, 4],"strings": ["c", "d", "e"]} diff --git a/integ-test/src/test/resources/indexDefinitions/array_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/array_index_mapping.json new file mode 100644 index 00000000000..a6c93f29f21 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/array_index_mapping.json @@ -0,0 +1,18 @@ +{ + "mappings": { + "properties": { + "numbers": { + "type": "long" + }, + "strings": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 9eaa0d777e2..b0002f9ba52 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -28,12 +28,14 @@ import org.apache.calcite.runtime.Hook; import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeName; import org.opensearch.sql.ast.statement.Explain.ExplainFormat; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; @@ -236,7 +238,20 @@ private void buildResultSet( for (int i = 1; i <= columnCount; ++i) { String columnName = metaData.getColumnName(i); RelDataType fieldType = fieldTypes.get(i - 1); - ExprType exprType = convertRelDataTypeToExprType(fieldType); + // TODO: Correct this after fixing issue github.com/opensearch-project/sql/issues/3751 + // The element type of struct and array is currently set to ANY. + // We set them using the runtime type as a workaround. + ExprType exprType; + if (fieldType.getSqlTypeName() == SqlTypeName.ANY) { + if (!values.isEmpty()) { + exprType = values.getFirst().tupleValue().get(columnName).type(); + } else { + // Using UNDEFINED instead of UNKNOWN to avoid throwing exception + exprType = ExprCoreType.UNDEFINED; + } + } else { + exprType = convertRelDataTypeToExprType(fieldType); + } columns.add(new Column(columnName, null, exprType)); } Schema schema = new Schema(columns); diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 05485430cd4..725709ba66d 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -39,6 +39,7 @@ ML: 'ML'; FILLNULL: 'FILLNULL'; TRENDLINE: 'TRENDLINE'; APPENDCOL: 'APPENDCOL'; +EXPAND: 'EXPAND'; SIMPLE_PATTERN: 'SIMPLE_PATTERN'; BRAIN: 'BRAIN'; VARIABLE_COUNT_THRESHOLD: 'VARIABLE_COUNT_THRESHOLD'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index b9c44771d86..02db151192f 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -71,6 +71,7 @@ commands | fillnullCommand | trendlineCommand | appendcolCommand + | expandCommand ; commandName @@ -97,6 +98,7 @@ commandName | AD | ML | FILLNULL + | EXPAND | TRENDLINE | EXPLAIN ; @@ -239,6 +241,10 @@ trendlineType | WMA ; +expandCommand + : EXPAND fieldExpression (AS alias = qualifiedName)? + ; + appendcolCommand : APPENDCOL (OVERRIDE EQUAL override = booleanLiteral)? LT_SQR_PRTHS commands (PIPE commands)* RT_SQR_PRTHS ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 5ca9b81a6fb..2fe563ec875 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -61,6 +61,7 @@ import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; +import org.opensearch.sql.ast.tree.Expand; import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; @@ -440,6 +441,14 @@ public UnresolvedPlan visitTopCommand(OpenSearchPPLParser.TopCommandContext ctx) groupList); } + /** expand command. */ + @Override + public UnresolvedPlan visitExpandCommand(OpenSearchPPLParser.ExpandCommandContext ctx) { + Field fieldExpression = (Field) internalVisitExpression(ctx.fieldExpression()); + String alias = ctx.alias != null ? internalVisitExpression(ctx.alias).toString() : null; + return new Expand(fieldExpression, alias); + } + @Override public UnresolvedPlan visitGrokCommand(OpenSearchPPLParser.GrokCommandContext ctx) { UnresolvedExpression sourceField = internalVisitExpression(ctx.source_field); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 802bba98a77..1afaff8cbba 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -57,6 +57,7 @@ import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; +import org.opensearch.sql.ast.tree.Expand; import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; @@ -309,6 +310,14 @@ public String visitEval(Eval node, String context) { return StringUtils.format("%s | eval %s", child, expressions); } + @Override + public String visitExpand(Expand node, String context) { + String child = node.getChild().getFirst().accept(this, context); + String field = visitExpression(node.getField()); + + return StringUtils.format("%s | expand %s", child, field); + } + /** Build {@link LogicalSort}. */ @Override public String visitSort(Sort node, String context) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExpandTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExpandTest.java new file mode 100644 index 00000000000..cd2763c7101 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExpandTest.java @@ -0,0 +1,142 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +public class CalcitePPLExpandTest extends CalcitePPLAbstractTest { + + public CalcitePPLExpandTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + // There is no existing table with arrays. We create one for test purpose. + public static class TableWithArray implements Table { + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("DEPTNO", SqlTypeName.INTEGER) + .add( + "EMPNOS", + factory.createArrayType(factory.createSqlType(SqlTypeName.INTEGER), -1)) + .build(); + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + // Add an empty table with name DEPT for test purpose + schema.add("DEPT", new TableWithArray()); + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testExpand() { + String ppl = "source=DEPT | expand EMPNOS"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(DEPTNO=[$0], EMPNOS=[$2])\n" + + " LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " Uncollect\n" + + " LogicalProject(EMPNOS=[$cor0.EMPNOS])\n" + + " LogicalFilter(condition=[=($cor0.EMPNOS, $1)])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; + verifyLogical(root, expectedLogical); + String expectedSparkSql = + "SELECT `$cor0`.`DEPTNO`, `t00`.`EMPNOS`\n" + + "FROM `scott`.`DEPT` `$cor0`,\n" + + "LATERAL UNNEST (SELECT `$cor0`.`EMPNOS`\n" + + "FROM `scott`.`DEPT`\n" + + "WHERE `$cor0`.`EMPNOS` = `EMPNOS`) `t0` (`EMPNOS`) `t00`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testExpandWithEval() { + String ppl = "source=DEPT | eval employee_no = EMPNOS | expand employee_no"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(DEPTNO=[$0], EMPNOS=[$1], employee_no=[$3])\n" + + " LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{2}])\n" + + " LogicalProject(DEPTNO=[$0], EMPNOS=[$1], employee_no=[$1])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " Uncollect\n" + + " LogicalProject(employee_no=[$cor0.employee_no])\n" + + " LogicalFilter(condition=[=($cor0.employee_no, $2)])\n" + + " LogicalProject(DEPTNO=[$0], EMPNOS=[$1], employee_no=[$1])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; + verifyLogical(root, expectedLogical); + String expectedSparkSql = + "SELECT `$cor0`.`DEPTNO`, `$cor0`.`EMPNOS`, `t20`.`employee_no`\n" + + "FROM (SELECT `DEPTNO`, `EMPNOS`, `EMPNOS` `employee_no`\n" + + "FROM `scott`.`DEPT`) `$cor0`,\n" + + "LATERAL UNNEST (SELECT `$cor0`.`employee_no`\n" + + "FROM (SELECT `DEPTNO`, `EMPNOS`, `EMPNOS` `employee_no`\n" + + "FROM `scott`.`DEPT`) `t0`\n" + + "WHERE `$cor0`.`employee_no` = `employee_no`) `t2` (`employee_no`) `t20`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } +}