Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ private PPLReturnTypes() {}
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIME_UDT);
public static final SqlReturnTypeInference TIMESTAMP_FORCE_NULLABLE =
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIMESTAMP_UDT);
public static final SqlReturnTypeInference IP_FORCE_NULLABLE =
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_IP_UDT);
public static SqlReturnTypeInference INTEGER_FORCE_NULLABLE =
ReturnTypes.INTEGER.andThen(SqlTypeTransforms.FORCE_NULLABLE);
public static SqlReturnTypeInference STRING_FORCE_NULLABLE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class UserDefinedFunctionUtils {
TYPE_FACTORY.createUDT(ExprUDT.EXPR_TIMESTAMP, true);
public static final RelDataType NULLABLE_STRING =
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR), true);
public static final RelDataType NULLABLE_IP_UDT = TYPE_FACTORY.createUDT(EXPR_IP, true);

public static RelDataType nullablePatternAggList =
createArrayType(
Expand All @@ -76,6 +77,7 @@ public class UserDefinedFunctionUtils {
ImmutableSet.of("match", "match_phrase", "match_bool_prefix", "match_phrase_prefix");
public static Set<String> MULTI_FIELDS_RELEVANCE_FUNCTION_SET =
ImmutableSet.of("simple_query_string", "query_string", "multi_match");
public static String IP_FUNCTION_NAME = "IP";

public static RelBuilder.AggCall TransferUserDefinedAggFunction(
Class<? extends UserDefinedAggFunction> UDAF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
.toUDF("TIME");

// IP cast function
public static final SqlOperator IP = new IPFunction().toUDF("IP");
public static final SqlOperator IP =
new IPFunction().toUDF(UserDefinedFunctionUtils.IP_FUNCTION_NAME);
public static final SqlOperator TIME_TO_SEC =
adaptExprMethodToUDF(
DateTimeFunctions.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@

package org.opensearch.sql.expression.function.udf.ip;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.ConstantExpression;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.data.model.ExprIpValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.expression.function.ImplementorUDF;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.expression.function.UDFOperandMetadata;

/**
Expand All @@ -32,33 +43,73 @@
* </ul>
*/
public class CompareIpFunction extends ImplementorUDF {
private final SqlKind kind;

private CompareIpFunction(ComparisonType comparisonType) {
super(new CompareImplementor(comparisonType), NullPolicy.ANY);
private CompareIpFunction(SqlKind kind) {
super(new CompareImplementor(kind), NullPolicy.ANY);
this.kind = kind;
}

public static CompareIpFunction less() {
return new CompareIpFunction(ComparisonType.LESS);
return new CompareIpFunction(SqlKind.LESS_THAN);
}

public static CompareIpFunction greater() {
return new CompareIpFunction(ComparisonType.GREATER);
return new CompareIpFunction(SqlKind.GREATER_THAN);
}

public static CompareIpFunction lessOrEquals() {
return new CompareIpFunction(ComparisonType.LESS_OR_EQUAL);
return new CompareIpFunction(SqlKind.LESS_THAN_OR_EQUAL);
}

public static CompareIpFunction greaterOrEquals() {
return new CompareIpFunction(ComparisonType.GREATER_OR_EQUAL);
return new CompareIpFunction(SqlKind.GREATER_THAN_OR_EQUAL);
}

public static CompareIpFunction equals() {
return new CompareIpFunction(ComparisonType.EQUALS);
return new CompareIpFunction(SqlKind.EQUALS);
}

public static CompareIpFunction notEquals() {
return new CompareIpFunction(ComparisonType.NOT_EQUALS);
return new CompareIpFunction(SqlKind.NOT_EQUALS);
}

@Override
public SqlUserDefinedFunction toUDF(String functionName, boolean isDeterministic) {
SqlIdentifier udfIdentifier =
new SqlIdentifier(Collections.singletonList(functionName), null, SqlParserPos.ZERO, null);
return new SqlUserDefinedFunction(
udfIdentifier,
kind,
getReturnTypeInference(),
InferTypes.ANY_NULLABLE,
getOperandMetadata(),
getFunction()) {
@Override
public boolean isDeterministic() {
return isDeterministic;
}

@Override
public @Nullable SqlOperator reverse() {
return switch (kind) {
case LESS_THAN -> PPLBuiltinOperators.GREATER_IP;
case GREATER_THAN -> PPLBuiltinOperators.LESS_IP;
case LESS_THAN_OR_EQUAL -> PPLBuiltinOperators.GTE_IP;
case GREATER_THAN_OR_EQUAL -> PPLBuiltinOperators.LTE_IP;
case EQUALS -> PPLBuiltinOperators.EQUALS_IP;
case NOT_EQUALS -> PPLBuiltinOperators.NOT_EQUALS_IP;
default -> throw new IllegalArgumentException(
String.format(
Locale.ROOT, "CompareIpFunction is not supposed to be of kind: %s", kind));
};
}

@Override
public SqlSyntax getSyntax() {
return SqlSyntax.BINARY;
}
};
}

@Override
Expand All @@ -72,10 +123,10 @@ public UDFOperandMetadata getOperandMetadata() {
}

public static class CompareImplementor implements NotNullImplementor {
private final ComparisonType comparisonType;
private final SqlKind compareType;

public CompareImplementor(ComparisonType comparisonType) {
this.comparisonType = comparisonType;
public CompareImplementor(SqlKind compareType) {
this.compareType = compareType;
}

@Override
Expand All @@ -88,19 +139,20 @@ public Expression implement(
translatedOperands.get(0),
translatedOperands.get(1));

return generateComparisonExpression(compareResult, comparisonType);
return evalCompareResult(compareResult, compareType);
}

private static Expression generateComparisonExpression(
Expression compareResult, ComparisonType comparisonType) {
private static Expression evalCompareResult(Expression compareResult, SqlKind compareType) {
final ConstantExpression zero = Expressions.constant(0);
return switch (comparisonType) {
return switch (compareType) {
case EQUALS -> Expressions.equal(compareResult, zero);
case NOT_EQUALS -> Expressions.notEqual(compareResult, zero);
case LESS -> Expressions.lessThan(compareResult, zero);
case LESS_OR_EQUAL -> Expressions.lessThanOrEqual(compareResult, zero);
case GREATER -> Expressions.greaterThan(compareResult, zero);
case GREATER_OR_EQUAL -> Expressions.greaterThanOrEqual(compareResult, zero);
case LESS_THAN -> Expressions.lessThan(compareResult, zero);
case LESS_THAN_OR_EQUAL -> Expressions.lessThanOrEqual(compareResult, zero);
case GREATER_THAN -> Expressions.greaterThan(compareResult, zero);
case GREATER_THAN_OR_EQUAL -> Expressions.greaterThanOrEqual(compareResult, zero);
default -> throw new UnsupportedOperationException(
String.format(Locale.ROOT, "Unsupported compare type: %s", compareType));
};
}

Expand All @@ -119,13 +171,4 @@ private static ExprIpValue toExprIpValue(Object obj) {
throw new IllegalArgumentException("Invalid IP type: " + obj);
}
}

public enum ComparisonType {
EQUALS,
NOT_EQUALS,
LESS,
LESS_OR_EQUAL,
GREATER,
GREATER_OR_EQUAL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.PPLReturnTypes;
import org.opensearch.sql.data.model.ExprIpValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
Expand Down Expand Up @@ -46,8 +46,7 @@ public UDFOperandMetadata getOperandMetadata() {

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return ReturnTypes.explicit(
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_IP, true));
return PPLReturnTypes.IP_FORCE_NULLABLE;
}

public static class CastImplementor
Expand Down
22 changes: 18 additions & 4 deletions integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.IOException;
import java.util.Locale;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.ResponseException;
Expand Down Expand Up @@ -93,18 +94,31 @@ public void testFilterByCompareStringTimePushDownExplain() throws IOException {

@Test
public void testFilterByCompareIPCoercion() throws IOException {
// Should automatically cast the string literal to IP.
// TODO: Push down IP comparison as range query with Calcite
String expected = loadExpectedPlan("explain_filter_compare_ip.json");
// Should automatically cast the string literal to IP and pushdown it as a range query
assertJsonEqualsIgnoreId(
expected,
loadExpectedPlan("explain_filter_compare_ip.json"),
explainQueryToString(
String.format(
Locale.ROOT,
"source=%s | where host > '1.1.1.1' | fields host",
TEST_INDEX_WEBLOGS)));
}

@Test
public void testFilterByCompareIpv6Swapped() throws IOException {
// Ignored in v2: the serialized string is unstable because of function properties
Assume.assumeTrue(isCalciteEnabled());
// Test swapping ip and string. In v2, this is pushed down as script;
// with Calcite, it will still be pushed down as a range query
assertJsonEqualsIgnoreId(
loadExpectedPlan("explain_filter_compare_ipv6_swapped.json"),
explainQueryToString(
String.format(
Locale.ROOT,
"source=%s | where '::ffff:1234' <= host | fields host",
TEST_INDEX_WEBLOGS)));
}

@Test
public void testWeekArgumentCoercion() throws IOException {
String expected = loadExpectedPlan("explain_week_argument_coercion.json");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[GREATER_IP($0, IP('1.1.1.1':VARCHAR))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], SCRIPT->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAensKICAiZmllbGRzIjogWwogICAgewogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQDfXsKICAib3AiOiB7CiAgICAibmFtZSI6ICJHUkVBVEVSX0lQIiwKICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICJzeW50YXgiOiAiRlVOQ1RJT04iCiAgfSwKICAib3BlcmFuZHMiOiBbCiAgICB7CiAgICAgICJpbnB1dCI6IDAsCiAgICAgICJuYW1lIjogIiQwIgogICAgfSwKICAgIHsKICAgICAgIm9wIjogewogICAgICAgICJuYW1lIjogIklQIiwKICAgICAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAgICAgInN5bnRheCI6ICJGVU5DVElPTiIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJsaXRlcmFsIjogIjEuMS4xLjEiLAogICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICBdLAogICAgICAiY2xhc3MiOiAib3JnLm9wZW5zZWFyY2guc3FsLmV4cHJlc3Npb24uZnVuY3Rpb24uVXNlckRlZmluZWRGdW5jdGlvbkJ1aWxkZXIkMSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIk9USEVSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlCiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfQogIF0sCiAgImNsYXNzIjogIm9yZy5vcGVuc2VhcmNoLnNxbC5leHByZXNzaW9uLmZ1bmN0aW9uLlVzZXJEZWZpbmVkRnVuY3Rpb25CdWlsZGVyJDEiLAogICJ0eXBlIjogewogICAgInR5cGUiOiAiQk9PTEVBTiIsCiAgICAibnVsbGFibGUiOiB0cnVlCiAgfSwKICAiZGV0ZXJtaW5pc3RpYyI6IHRydWUsCiAgImR5bmFtaWMiOiBmYWxzZQp9dAAKZmllbGRUeXBlc3NyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABdAAEaG9zdH5yAClvcmcub3BlbnNlYXJjaC5zcWwuZGF0YS50eXBlLkV4cHJDb3JlVHlwZQAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQAAklQeHg=\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*}},\"boost\":1.0}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"1.1.1.1\",\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->LTE_IP(IP('::ffff:1234':VARCHAR), $0), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"::ffff:1234\",\"to\":null,\"include_lower\":true,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..11=[{inputs}], expr#12=['::ffff:1234':VARCHAR], expr#13=[IP($t12)], expr#14=[LTE_IP($t13, $t0)], host=[$t0], $condition=[$t14])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n"
}
}
Loading
Loading