Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
@Getter
@EqualsAndHashCode(callSuper = false)
public class QualifiedName extends UnresolvedExpression {
public static final String DELIMITER = ".";
private final List<String> parts;

public QualifiedName(String name) {
Expand Down Expand Up @@ -94,7 +95,7 @@ public QualifiedName rest() {
}

public String toString() {
return String.join(".", this.parts);
return String.join(DELIMITER, this.parts);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ private static RexNode resolveFieldAccess(
if (length == parts.size() - start) {
return field;
} else {
String itemName = joinParts(parts, length + start, parts.size() - 1 - length);
return createItemAccess(field, itemName, context);
String itemName = joinParts(parts, length + start, parts.size() - length);
return context.relBuilder.alias(
createItemAccess(field, itemName, context),
String.join(QualifiedName.DELIMITER, parts.subList(start, parts.size())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void init() throws Exception {
loadIndex(Index.LOGS);
loadIndex(Index.WORKER);
loadIndex(Index.WORK_INFORMATION);
loadIndex(Index.WEBLOG);
}

@Override
Expand Down Expand Up @@ -1416,4 +1417,15 @@ public void testTopKThenSortExplain() throws IOException {
+ "| sort age "
+ "| fields age"));
}

@Test
public void testGeoIpPushedInAgg() throws IOException {
// This explain IT verifies that externally registered UDF can be properly pushed down
assertYamlEqualsIgnoreId(
loadExpectedPlan("udf_geoip_in_agg_pushed.yaml"),
explainQueryYaml(
String.format(
"source=%s | eval info = geoip('my-datasource', host) | stats count() by info.city",
TEST_INDEX_WEBLOGS)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,35 @@ public void testGeoIpEnrichmentWithIpFieldAsInput() throws IOException {
rows("10.0.0.1", Map.of("country", "USA")),
rows("fd12:2345:6789:1:a1b2:c3d4:e5f6:789a", Map.of("country", "India")));
}

@Test
public void testGeoIpInAggregation() throws IOException {
JSONObject result1 =
executeQuery(
String.format(
"source=%s | where method='POST' | eval info = geoip('%s', host) | eval"
+ " date=DATE('2020-12-10') | stats count() by info.city, method, span(date,"
+ " 1month) as month",
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
verifySchema(
result1,
schema("count()", "bigint"),
schema("month", "date"),
schema("info.city", "string"),
schema("method", "string"));
verifyDataRows(
result1,
rows(1, "2020-12-01", "Seattle", "POST"),
rows(1, "2020-12-01", "Bengaluru", "POST"));

// This case is pushed down into DSL with scripts
JSONObject result2 =
executeQuery(
String.format(
"source=%s | where method='POST' | eval info = geoip('%s', host) | stats count() by"
+ " info.city",
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
verifySchema(result2, schema("count()", "bigint"), schema("info.city", "string"));
verifyDataRows(result2, rows(1, "Seattle"), rows(1, "Bengaluru"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(count()=[$1], info.city=[$0])
LogicalAggregate(group=[{0}], count()=[COUNT()])
LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), info.city], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"info.city":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAknsKICAiZmllbGRzIjogWwogICAgewogICAgICAidWR0IjogIkVYUFJfSVAiLAogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQETnsKICAib3AiOiB7CiAgICAibmFtZSI6ICJJVEVNIiwKICAgICJraW5kIjogIklURU0iLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiR0VPSVAiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImxpdGVyYWwiOiAibXktZGF0YXNvdXJjZSIsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgIH0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJpbnB1dCI6IDAsCiAgICAgICAgICAibmFtZSI6ICIkMCIKICAgICAgICB9CiAgICAgIF0sCiAgICAgICJjbGFzcyI6ICJvcmcub3BlbnNlYXJjaC5zcWwuZXhwcmVzc2lvbi5mdW5jdGlvbi5Vc2VyRGVmaW5lZEZ1bmN0aW9uQnVpbGRlciQxIiwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiTUFQIiwKICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAia2V5IjogewogICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgIH0sCiAgICAgICAgInZhbHVlIjogewogICAgICAgICAgInR5cGUiOiAiQU5ZIiwKICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgInByZWNpc2lvbiI6IC0xLAogICAgICAgICAgInNjYWxlIjogLTIxNDc0ODM2NDgKICAgICAgICB9CiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfSwKICAgIHsKICAgICAgImxpdGVyYWwiOiAiY2l0eSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIkNIQVIiLAogICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICJwcmVjaXNpb24iOiA0CiAgICAgIH0KICAgIH0KICBdCn10AApmaWVsZFR5cGVzc3IAEWphdmEudXRpbC5IYXNoTWFwBQfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAAAADHcIAAAAEAAAAAF0AARob3N0fnIAKW9yZy5vcGVuc2VhcmNoLnNxbC5kYXRhLnR5cGUuRXhwckNvcmVUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAACSVB4eA==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0}},"missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(count()=[$1], info.city=[$0])
LogicalAggregate(group=[{0}], count()=[COUNT()])
LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], info.city=[$t0])
EnumerableAggregate(group=[{0}], count()=[COUNT()])
EnumerableCalc(expr#0..11=[{inputs}], expr#12=['my-datasource':VARCHAR], expr#13=[GEOIP($t12, $t0)], expr#14=['city'], expr#15=[ITEM($t13, $t14)], info.city=[$t15])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,12 @@ private ExprValue parse(

// Field type may be not defined in mapping if users have disabled dynamic mapping.
// Then try to parse content directly based on the value itself
if (fieldType.isEmpty()) {
// Besides, sub-fields of generated objects are also of type UNDEFINED. We parse the content
// directly on the value itself for this case as well.
// TODO: Remove the second condition once https://github.com/opensearch-project/sql/issues/3751
// is resolved
if (fieldType.isEmpty()
|| fieldType.get().equals(OpenSearchDataType.of(ExprCoreType.UNDEFINED))) {
return parseContent(content);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

package org.opensearch.sql.opensearch.executor;

import static org.opensearch.sql.expression.function.BuiltinFunctionName.DISTINCT_COUNT_APPROX;

import com.google.common.base.Suppliers;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.PreparedStatement;
Expand All @@ -17,16 +16,21 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ListSqlOperatorTable;
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -273,8 +277,9 @@ private void buildResultSet(
private void registerOpenSearchFunctions() {
if (client instanceof OpenSearchNodeClient) {
SqlUserDefinedFunction geoIpFunction =
new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP");
new GeoIpFunction(client.getNodeClient()).toUDF(BuiltinFunctionName.GEOIP.name());
PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction);
OperatorTable.addOperator(BuiltinFunctionName.GEOIP.name(), geoIpFunction);
} else {
logger.info(
"Function [GEOIP] not registered: incompatible client type {}",
Expand All @@ -284,10 +289,37 @@ private void registerOpenSearchFunctions() {
SqlUserDefinedAggFunction approxDistinctCountFunction =
UserDefinedFunctionUtils.createUserDefinedAggFunction(
DistinctCountApproxAggFunction.class,
DISTINCT_COUNT_APPROX.toString(),
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(),
ReturnTypes.BIGINT_FORCE_NULLABLE,
null);
PPLFuncImpTable.INSTANCE.registerExternalAggOperator(
DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
OperatorTable.addOperator(
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction);
}

/**
* Dynamic SqlOperatorTable that allows adding operators after initialization. Similar to
* PPLBuiltinOperator.instance() or SqlStdOperatorTable.instance().
*/
public static class OperatorTable extends ListSqlOperatorTable {
private static final Supplier<OperatorTable> INSTANCE =
Suppliers.memoize(() -> (OperatorTable) new OperatorTable().init());
// Use map instead of list to avoid duplicated elements if the class is initialized multiple
// times
private static final Map<String, SqlOperator> operators = new ConcurrentHashMap<>();

public static SqlOperatorTable instance() {
return INSTANCE.get();
}

private ListSqlOperatorTable init() {
setOperators(buildIndex(operators.values()));
return this;
}

public static synchronized void addOperator(String name, SqlOperator operator) {
operators.put(name, operator);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Base64;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -31,6 +32,7 @@
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine;
import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil;

/**
Expand All @@ -39,7 +41,7 @@
* <p>This serializer:
* <li>Uses Calcite's RelJson class to convert RexNode and RelDataType to/from JSON string
* <li>Manages required OpenSearch field mapping information Note: OpenSearch ExprType subclasses
* implement {@link java.io.Serializable} and are handled through standard Java serialization.
* implement {@link Serializable} and are handled through standard Java serialization.
*/
@Getter
public class RelJsonSerializer {
Expand All @@ -52,13 +54,7 @@ public class RelJsonSerializer {
private static final ObjectMapper mapper = new ObjectMapper();
private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF =
new TypeReference<>() {};
private static final SqlOperatorTable pplSqlOperatorTable =
SqlOperatorTables.chain(
PPLBuiltinOperators.instance(),
SqlStdOperatorTable.instance(),
// Add a list of necessary SqlLibrary if needed
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
SqlLibrary.MYSQL, SqlLibrary.BIG_QUERY, SqlLibrary.SPARK, SqlLibrary.POSTGRESQL));
private static volatile SqlOperatorTable pplSqlOperatorTable;

static {
mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
Expand All @@ -68,6 +64,27 @@ public RelJsonSerializer(RelOptCluster cluster) {
this.cluster = cluster;
}

private static SqlOperatorTable getPplSqlOperatorTable() {
if (pplSqlOperatorTable == null) {
synchronized (RelJsonSerializer.class) {
if (pplSqlOperatorTable == null) {
pplSqlOperatorTable =
SqlOperatorTables.chain(
PPLBuiltinOperators.instance(),
SqlStdOperatorTable.instance(),
OpenSearchExecutionEngine.OperatorTable.instance(),
// Add a list of necessary SqlLibrary if needed
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
SqlLibrary.MYSQL,
SqlLibrary.BIG_QUERY,
SqlLibrary.SPARK,
SqlLibrary.POSTGRESQL));
}
}
}
return pplSqlOperatorTable;
}

/**
* Serializes Calcite expressions and field types into a map object string.
*
Expand Down Expand Up @@ -136,7 +153,8 @@ public Map<String, Object> deserialize(String struct) {
Map<String, Object> rowTypeMap = mapper.readValue((String) objectMap.get(ROW_TYPE), TYPE_REF);
RelDataType rowType = relJson.toType(cluster.getTypeFactory(), rowTypeMap);
OpenSearchRelInputTranslator inputTranslator = new OpenSearchRelInputTranslator(rowType);
relJson = relJson.withInputTranslator(inputTranslator).withOperatorTable(pplSqlOperatorTable);
relJson =
relJson.withInputTranslator(inputTranslator).withOperatorTable(getPplSqlOperatorTable());
Map<String, Object> exprMap = mapper.readValue((String) objectMap.get(EXPR), TYPE_REF);
RexNode rexNode = relJson.toRex(cluster, exprMap);

Expand Down
Loading