diff --git a/core/src/main/java/org/opensearch/sql/ast/expression/QualifiedName.java b/core/src/main/java/org/opensearch/sql/ast/expression/QualifiedName.java index 852b61cfa8a..84fb486702a 100644 --- a/core/src/main/java/org/opensearch/sql/ast/expression/QualifiedName.java +++ b/core/src/main/java/org/opensearch/sql/ast/expression/QualifiedName.java @@ -22,6 +22,7 @@ @Getter @EqualsAndHashCode(callSuper = false) public class QualifiedName extends UnresolvedExpression { + public static final String DELIMITER = "."; private final List parts; public QualifiedName(String name) { @@ -94,7 +95,7 @@ public QualifiedName rest() { } public String toString() { - return String.join(".", this.parts); + return String.join(DELIMITER, this.parts); } @Override diff --git a/core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java b/core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java index 3b8205490e6..772f81abd27 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java +++ b/core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java @@ -249,8 +249,10 @@ private static RexNode resolveFieldAccess( if (length == parts.size() - start) { return field; } else { - String itemName = joinParts(parts, length + start, parts.size() - 1 - length); - return createItemAccess(field, itemName, context); + String itemName = joinParts(parts, length + start, parts.size() - length); + return context.relBuilder.alias( + createItemAccess(field, itemName, context), + String.join(QualifiedName.DELIMITER, parts.subList(start, parts.size()))); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 81936ffbfb1..7bada554ec1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -38,6 +38,7 @@ public void init() throws Exception { loadIndex(Index.LOGS); loadIndex(Index.WORKER); loadIndex(Index.WORK_INFORMATION); + loadIndex(Index.WEBLOG); } @Override @@ -1416,4 +1417,15 @@ public void testTopKThenSortExplain() throws IOException { + "| sort age " + "| fields age")); } + + @Test + public void testGeoIpPushedInAgg() throws IOException { + // This explain IT verifies that externally registered UDF can be properly pushed down + assertYamlEqualsIgnoreId( + loadExpectedPlan("udf_geoip_in_agg_pushed.yaml"), + explainQueryYaml( + String.format( + "source=%s | eval info = geoip('my-datasource', host) | stats count() by info.city", + TEST_INDEX_WEBLOGS))); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteGeoIpFunctionsIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteGeoIpFunctionsIT.java index 78233d8dd52..72ca1d58b8a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteGeoIpFunctionsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteGeoIpFunctionsIT.java @@ -53,4 +53,35 @@ public void testGeoIpEnrichmentWithIpFieldAsInput() throws IOException { rows("10.0.0.1", Map.of("country", "USA")), rows("fd12:2345:6789:1:a1b2:c3d4:e5f6:789a", Map.of("country", "India"))); } + + @Test + public void testGeoIpInAggregation() throws IOException { + JSONObject result1 = + executeQuery( + String.format( + "source=%s | where method='POST' | eval info = geoip('%s', host) | eval" + + " date=DATE('2020-12-10') | stats count() by info.city, method, span(date," + + " 1month) as month", + TEST_INDEX_WEBLOGS, DATASOURCE_NAME)); + verifySchema( + result1, + schema("count()", "bigint"), + schema("month", "date"), + schema("info.city", "string"), + schema("method", "string")); + verifyDataRows( + result1, + rows(1, "2020-12-01", "Seattle", "POST"), + rows(1, "2020-12-01", "Bengaluru", "POST")); + + // This case is pushed down into DSL with scripts + JSONObject result2 = + executeQuery( + String.format( + "source=%s | where method='POST' | eval info = geoip('%s', host) | stats count() by" + + " info.city", + TEST_INDEX_WEBLOGS, DATASOURCE_NAME)); + verifySchema(result2, schema("count()", "bigint"), schema("info.city", "string")); + verifyDataRows(result2, rows(1, "Seattle"), rows(1, "Bengaluru")); + } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/udf_geoip_in_agg_pushed.yaml b/integ-test/src/test/resources/expectedOutput/calcite/udf_geoip_in_agg_pushed.yaml new file mode 100644 index 00000000000..baf08f483a8 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/udf_geoip_in_agg_pushed.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(count()=[$1], info.city=[$0]) + LogicalAggregate(group=[{0}], count()=[COUNT()]) + LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), info.city], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"info.city":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAknsKICAiZmllbGRzIjogWwogICAgewogICAgICAidWR0IjogIkVYUFJfSVAiLAogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQETnsKICAib3AiOiB7CiAgICAibmFtZSI6ICJJVEVNIiwKICAgICJraW5kIjogIklURU0iLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiR0VPSVAiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImxpdGVyYWwiOiAibXktZGF0YXNvdXJjZSIsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgIH0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJpbnB1dCI6IDAsCiAgICAgICAgICAibmFtZSI6ICIkMCIKICAgICAgICB9CiAgICAgIF0sCiAgICAgICJjbGFzcyI6ICJvcmcub3BlbnNlYXJjaC5zcWwuZXhwcmVzc2lvbi5mdW5jdGlvbi5Vc2VyRGVmaW5lZEZ1bmN0aW9uQnVpbGRlciQxIiwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiTUFQIiwKICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAia2V5IjogewogICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgIH0sCiAgICAgICAgInZhbHVlIjogewogICAgICAgICAgInR5cGUiOiAiQU5ZIiwKICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgInByZWNpc2lvbiI6IC0xLAogICAgICAgICAgInNjYWxlIjogLTIxNDc0ODM2NDgKICAgICAgICB9CiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfSwKICAgIHsKICAgICAgImxpdGVyYWwiOiAiY2l0eSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIkNIQVIiLAogICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICJwcmVjaXNpb24iOiA0CiAgICAgIH0KICAgIH0KICBdCn10AApmaWVsZFR5cGVzc3IAEWphdmEudXRpbC5IYXNoTWFwBQfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAAAADHcIAAAAEAAAAAF0AARob3N0fnIAKW9yZy5vcGVuc2VhcmNoLnNxbC5kYXRhLnR5cGUuRXhwckNvcmVUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAACSVB4eA==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0}},"missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/udf_geoip_in_agg_pushed.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/udf_geoip_in_agg_pushed.yaml new file mode 100644 index 00000000000..0dbea4e19e2 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/udf_geoip_in_agg_pushed.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(count()=[$1], info.city=[$0]) + LogicalAggregate(group=[{0}], count()=[COUNT()]) + LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], info.city=[$t0]) + EnumerableAggregate(group=[{0}], count()=[COUNT()]) + EnumerableCalc(expr#0..11=[{inputs}], expr#12=['my-datasource':VARCHAR], expr#13=[GEOIP($t12, $t0)], expr#14=['city'], expr#15=[ITEM($t13, $t14)], info.city=[$t15]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]]) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java index f303cb725e0..85093ba39ab 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java @@ -190,7 +190,12 @@ private ExprValue parse( // Field type may be not defined in mapping if users have disabled dynamic mapping. // Then try to parse content directly based on the value itself - if (fieldType.isEmpty()) { + // Besides, sub-fields of generated objects are also of type UNDEFINED. We parse the content + // directly on the value itself for this case as well. + // TODO: Remove the second condition once https://github.com/opensearch-project/sql/issues/3751 + // is resolved + if (fieldType.isEmpty() + || fieldType.get().equals(OpenSearchDataType.of(ExprCoreType.UNDEFINED))) { return parseContent(content); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index a1a36a27468..6f0d4bf2f5a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -5,8 +5,7 @@ package org.opensearch.sql.opensearch.executor; -import static org.opensearch.sql.expression.function.BuiltinFunctionName.DISTINCT_COUNT_APPROX; - +import com.google.common.base.Suppliers; import java.security.AccessController; import java.security.PrivilegedAction; import java.sql.PreparedStatement; @@ -17,7 +16,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; @@ -25,8 +26,11 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.runtime.Hook; import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.util.ListSqlOperatorTable; import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; import org.apache.calcite.sql.validate.SqlUserDefinedFunction; import org.apache.logging.log4j.LogManager; @@ -273,8 +277,9 @@ private void buildResultSet( private void registerOpenSearchFunctions() { if (client instanceof OpenSearchNodeClient) { SqlUserDefinedFunction geoIpFunction = - new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP"); + new GeoIpFunction(client.getNodeClient()).toUDF(BuiltinFunctionName.GEOIP.name()); PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction); + OperatorTable.addOperator(BuiltinFunctionName.GEOIP.name(), geoIpFunction); } else { logger.info( "Function [GEOIP] not registered: incompatible client type {}", @@ -284,10 +289,37 @@ private void registerOpenSearchFunctions() { SqlUserDefinedAggFunction approxDistinctCountFunction = UserDefinedFunctionUtils.createUserDefinedAggFunction( DistinctCountApproxAggFunction.class, - DISTINCT_COUNT_APPROX.toString(), + BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), ReturnTypes.BIGINT_FORCE_NULLABLE, null); PPLFuncImpTable.INSTANCE.registerExternalAggOperator( - DISTINCT_COUNT_APPROX, approxDistinctCountFunction); + BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction); + OperatorTable.addOperator( + BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction); + } + + /** + * Dynamic SqlOperatorTable that allows adding operators after initialization. Similar to + * PPLBuiltinOperator.instance() or SqlStdOperatorTable.instance(). + */ + public static class OperatorTable extends ListSqlOperatorTable { + private static final Supplier INSTANCE = + Suppliers.memoize(() -> (OperatorTable) new OperatorTable().init()); + // Use map instead of list to avoid duplicated elements if the class is initialized multiple + // times + private static final Map operators = new ConcurrentHashMap<>(); + + public static SqlOperatorTable instance() { + return INSTANCE.get(); + } + + private ListSqlOperatorTable init() { + setOperators(buildIndex(operators.values())); + return this; + } + + public static synchronized void addOperator(String name, SqlOperator operator) { + operators.put(name, operator); + } } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java index 113e855d41d..fb751c8a72c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java @@ -12,6 +12,7 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.Serializable; import java.util.Base64; import java.util.HashMap; import java.util.LinkedHashMap; @@ -31,6 +32,7 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.function.PPLBuiltinOperators; +import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine; import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil; /** @@ -39,7 +41,7 @@ *

This serializer: *

  • Uses Calcite's RelJson class to convert RexNode and RelDataType to/from JSON string *
  • Manages required OpenSearch field mapping information Note: OpenSearch ExprType subclasses - * implement {@link java.io.Serializable} and are handled through standard Java serialization. + * implement {@link Serializable} and are handled through standard Java serialization. */ @Getter public class RelJsonSerializer { @@ -52,13 +54,7 @@ public class RelJsonSerializer { private static final ObjectMapper mapper = new ObjectMapper(); private static final TypeReference> TYPE_REF = new TypeReference<>() {}; - private static final SqlOperatorTable pplSqlOperatorTable = - SqlOperatorTables.chain( - PPLBuiltinOperators.instance(), - SqlStdOperatorTable.instance(), - // Add a list of necessary SqlLibrary if needed - SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable( - SqlLibrary.MYSQL, SqlLibrary.BIG_QUERY, SqlLibrary.SPARK, SqlLibrary.POSTGRESQL)); + private static volatile SqlOperatorTable pplSqlOperatorTable; static { mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true); @@ -68,6 +64,27 @@ public RelJsonSerializer(RelOptCluster cluster) { this.cluster = cluster; } + private static SqlOperatorTable getPplSqlOperatorTable() { + if (pplSqlOperatorTable == null) { + synchronized (RelJsonSerializer.class) { + if (pplSqlOperatorTable == null) { + pplSqlOperatorTable = + SqlOperatorTables.chain( + PPLBuiltinOperators.instance(), + SqlStdOperatorTable.instance(), + OpenSearchExecutionEngine.OperatorTable.instance(), + // Add a list of necessary SqlLibrary if needed + SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable( + SqlLibrary.MYSQL, + SqlLibrary.BIG_QUERY, + SqlLibrary.SPARK, + SqlLibrary.POSTGRESQL)); + } + } + } + return pplSqlOperatorTable; + } + /** * Serializes Calcite expressions and field types into a map object string. * @@ -136,7 +153,8 @@ public Map deserialize(String struct) { Map rowTypeMap = mapper.readValue((String) objectMap.get(ROW_TYPE), TYPE_REF); RelDataType rowType = relJson.toType(cluster.getTypeFactory(), rowTypeMap); OpenSearchRelInputTranslator inputTranslator = new OpenSearchRelInputTranslator(rowType); - relJson = relJson.withInputTranslator(inputTranslator).withOperatorTable(pplSqlOperatorTable); + relJson = + relJson.withInputTranslator(inputTranslator).withOperatorTable(getPplSqlOperatorTable()); Map exprMap = mapper.readValue((String) objectMap.get(EXPR), TYPE_REF); RexNode rexNode = relJson.toRex(cluster, exprMap);