Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.View;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.common.util.DebugUtil;
Expand All @@ -36,6 +37,7 @@
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
import org.apache.doris.nereids.SqlCacheContext.TableVersion;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundVariable;
import org.apache.doris.nereids.parser.NereidsParser;
Expand Down Expand Up @@ -195,14 +197,14 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
.getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5));

if (sqlCacheContextWithVariable != null) {
return tryParseSqlWithoutCheckVariable(
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity
return tryParseSql(
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity, true
);
} else {
return Optional.empty();
}
} else {
return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity);
return tryParseSql(connectContext, key, sqlCacheContext, currentUserIdentity, false);
}
}

Expand All @@ -219,9 +221,9 @@ private String normalizeSql(String sql) {
return NereidsParser.removeCommentAndTrimBlank(sql);
}

private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
ConnectContext connectContext, String key,
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
private Optional<LogicalSqlCache> tryParseSql(
ConnectContext connectContext, String key, SqlCacheContext sqlCacheContext,
UserIdentity currentUserIdentity, boolean checkUserVariable) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
Expand Down Expand Up @@ -255,8 +257,12 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
try {
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();

List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext);
List<Variable> currentVariables = ImmutableList.of();
if (checkUserVariable) {
currentVariables = resolveUserVariables(sqlCacheContext);
}
boolean usedVariablesChanged
= checkUserVariable && usedVariablesChanged(currentVariables, sqlCacheContext);
if (resultSetInFe.isPresent() && !usedVariablesChanged) {
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);

Expand All @@ -270,9 +276,15 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
}

Status status = new Status();
PUniqueId cacheKeyMd5 = usedVariablesChanged
? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables))
: sqlCacheContext.getOrComputeCacheKeyMd5();

PUniqueId cacheKeyMd5;
if (usedVariablesChanged) {
invalidateCache(key);
cacheKeyMd5 = sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables));
} else {
cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
}

InternalService.PFetchCacheResult cacheData =
SqlCache.getCacheData(sqlCacheContext.getCacheProxy(),
cacheKeyMd5, sqlCacheContext.getLatestPartitionId(),
Expand Down Expand Up @@ -304,23 +316,36 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
return true;
}

for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
FullTableName fullTableName = scanTable.fullTableName;
TableIf tableIf = findTableIf(env, fullTableName);
if (!(tableIf instanceof OlapTable)) {
// the query maybe scan empty partition of the table, we should check these table version too,
// but the table not exists in sqlCacheContext.getScanTables(), so we need check here.
// check table type and version
for (Entry<FullTableName, TableVersion> scanTable : sqlCacheContext.getUsedTables().entrySet()) {
TableVersion tableVersion = scanTable.getValue();
if (tableVersion.type != TableType.OLAP) {
return true;
}
TableIf tableIf = findTableIf(env, scanTable.getKey());
if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) {
return true;
}

OlapTable olapTable = (OlapTable) tableIf;
long currentTableTime = olapTable.getVisibleVersionTime();
long cacheTableTime = scanTable.latestTimestamp;
long currentTableVersion = olapTable.getVisibleVersion();
long cacheTableVersion = scanTable.latestVersion;
long cacheTableVersion = tableVersion.version;
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
if (currentTableTime > cacheTableTime
|| (currentTableTime == cacheTableTime && currentTableVersion > cacheTableVersion)) {
if (currentTableVersion != cacheTableVersion) {
return true;
}
}

// check partition version
for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
FullTableName fullTableName = scanTable.fullTableName;
TableIf tableIf = findTableIf(env, fullTableName);
if (!(tableIf instanceof OlapTable)) {
return true;
}
OlapTable olapTable = (OlapTable) tableIf;
for (Long scanPartitionId : scanTable.getScanPartitions()) {
Partition partition = olapTable.getPartition(scanPartitionId);
// partition == null: is this partition truncated?
Expand Down Expand Up @@ -388,7 +413,7 @@ private boolean dataMaskPoliciesChanged(
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
for (FullTableName fullTableName : sqlCacheContext.getUsedTables().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.FieldInfo;
Expand All @@ -42,6 +44,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -62,7 +65,8 @@ public class SqlCacheContext {
private volatile long latestPartitionTime = -1;
private volatile long latestPartitionVersion = -1;
private volatile long sumOfPartitionNum = -1;
private final Set<FullTableName> usedTables = Sets.newLinkedHashSet();
// value: version of table
private final Map<FullTableName, TableVersion> usedTables = Maps.newLinkedHashMap();
// value: ddl sql
private final Map<FullTableName, String> usedViews = Maps.newLinkedHashMap();
// value: usedColumns
Expand Down Expand Up @@ -136,8 +140,13 @@ public synchronized void addUsedTable(TableIf tableIf) {
return;
}

usedTables.add(
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName())
usedTables.put(
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()),
new TableVersion(
tableIf.getId(),
tableIf instanceof OlapTable ? ((OlapTable) tableIf).getVisibleVersion() : 0L,
tableIf.getType()
)
);
}

Expand Down Expand Up @@ -283,8 +292,8 @@ public void setCacheProxy(CacheProxy cacheProxy) {
this.cacheProxy = cacheProxy;
}

public Set<FullTableName> getUsedTables() {
return ImmutableSet.copyOf(usedTables);
public Map<FullTableName, TableVersion> getUsedTables() {
return Collections.unmodifiableMap(usedTables);
}

public Map<FullTableName, String> getUsedViews() {
Expand Down Expand Up @@ -460,6 +469,15 @@ public void addScanPartition(Long partitionId) {
}
}

/** TableVersion */
@lombok.Data
@lombok.AllArgsConstructor
public static class TableVersion {
public final long id;
public final long version;
public final TableType type;
}

/** CacheKeyType */
public enum CacheKeyType {
// use `userIdentity`:`sql`.trim() as Cache key in FE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public StatementContext(ConnectContext connectContext, OriginStatement originSta
this.sqlCacheContext = new SqlCacheContext(
connectContext.getCurrentUserIdentity(), connectContext.queryId());
if (originStatement != null) {
this.sqlCacheContext.setOriginSql(originStatement.originStmt.trim());
this.sqlCacheContext.setOriginSql(originStatement.originStmt);
}
} else {
this.sqlCacheContext = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.google.common.collect.Maps
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.Uninterruptibles
import com.google.gson.Gson
import groovy.json.JsonSlurper
import com.google.common.collect.ImmutableList
Expand All @@ -42,7 +43,6 @@ import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.Hdfs
import org.apache.doris.regression.util.SuiteUtils
import org.apache.doris.regression.util.DebugPoint
import org.apache.doris.regression.RunMode
import org.junit.jupiter.api.Assertions

import org.slf4j.Logger
Expand Down Expand Up @@ -625,6 +625,23 @@ class Suite implements GroovyInterceptable {
return sql
}

<T> T retry(int executeTimes = 3, int intervalMillis = 1000, Closure<Integer> closure) {
Throwable throwable = null
for (int i = 1; i <= executeTimes; ++i) {
try {
return closure(i) as T
} catch (Throwable t) {
logger.warn("Retry failed: $t", t)
throwable = t
Uninterruptibles.sleepUninterruptibly(intervalMillis, TimeUnit.MILLISECONDS)
}
}
if (throwable != null) {
throw throwable
}
return null
}

void explain(Closure actionSupplier) {
if (context.useArrowFlightSql()) {
runAction(new ExplainAction(context, "ARROW_FLIGHT_SQL"), actionSupplier)
Expand Down Expand Up @@ -881,6 +898,20 @@ class Suite implements GroovyInterceptable {
}
}

void foreachFrontends(Closure action) {
def rows = sql_return_maparray("show frontends")
for (def row in rows) {
action(row)
}
}

void foreachBackends(Closure action) {
def rows = sql_return_maparray("show backends")
for (def row in rows) {
action(row)
}
}

List<String> getFrontendIpHttpPort() {
return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort };
}
Expand Down
Loading