-
Notifications
You must be signed in to change notification settings - Fork 3.8k
[Debug](distribute) Check bucket hash table before quit #53661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
58628d5
add BE part of FunctionCrc32Internal
bobhan1 252717b
add FE part of FunctionCrc32Internal
bobhan1 80741b6
varargs
bobhan1 edf4df8
tmp check
bobhan1 c9e9c5e
check before quite
bobhan1 606fb59
fix checkstyle
bobhan1 5dfe4af
skip view and bucket=1 tables
bobhan1 57cf653
fix null values
bobhan1 720b602
ignore compute error
bobhan1 dd88da8
add log
bobhan1 25b81fa
Revert "add log"
bobhan1 82e2681
skip decimal
bobhan1 edddd73
skip async mvs
bobhan1 687e715
fix
bobhan1 76e3bb5
fix
bobhan1 9a5a2d3
fix
bobhan1 e185618
update
bobhan1 eb43b11
disable promotion
bobhan1 d6cfa40
Reapply "add log"
bobhan1 477ab9a
don't skip decimalv3
bobhan1 160a09c
Revert "Reapply "add log""
bobhan1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
77 changes: 77 additions & 0 deletions
77
.../main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package org.apache.doris.nereids.trees.expressions.functions.scalar; | ||
|
|
||
| import org.apache.doris.catalog.FunctionSignature; | ||
| import org.apache.doris.nereids.trees.expressions.Expression; | ||
| import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable; | ||
| import org.apache.doris.nereids.trees.expressions.functions.ComputePrecision; | ||
| import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; | ||
| import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; | ||
| import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; | ||
| import org.apache.doris.nereids.types.BigIntType; | ||
| import org.apache.doris.nereids.types.coercion.AnyDataType; | ||
| import org.apache.doris.nereids.util.ExpressionUtils; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.collect.ImmutableList; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| /** | ||
| * for debug only, compute crc32 hash value as the same way in `VOlapTablePartitionParam::find_tablets()` | ||
| */ | ||
| public class Crc32Internal extends ScalarFunction | ||
| implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNotNullable, ComputePrecision { | ||
|
|
||
| public static final List<FunctionSignature> SIGNATURES = ImmutableList.of( | ||
| FunctionSignature.ret(BigIntType.INSTANCE).varArgs(AnyDataType.INSTANCE_WITHOUT_INDEX) | ||
| ); | ||
|
|
||
| /** | ||
| * constructor with 1 or more arguments. | ||
| */ | ||
| public Crc32Internal(Expression arg, Expression... varArgs) { | ||
| super("crc32_internal", ExpressionUtils.mergeArguments(arg, varArgs)); | ||
| } | ||
|
|
||
| /** | ||
| * withChildren. | ||
| */ | ||
| @Override | ||
| public Crc32Internal withChildren(List<Expression> children) { | ||
| Preconditions.checkArgument(children.size() >= 1); | ||
| return new Crc32Internal(children.get(0), | ||
| children.subList(1, children.size()).toArray(new Expression[0])); | ||
| } | ||
|
|
||
| @Override | ||
| public List<FunctionSignature> getSignatures() { | ||
| return SIGNATURES; | ||
| } | ||
|
|
||
| @Override | ||
| public FunctionSignature computePrecision(FunctionSignature signature) { | ||
| return signature; | ||
| } | ||
|
|
||
| @Override | ||
| public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { | ||
| return visitor.visitCrc32Internal(this, context); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -144,6 +144,7 @@ | |
| import org.apache.doris.nereids.trees.expressions.functions.scalar.CountEqual; | ||
| import org.apache.doris.nereids.trees.expressions.functions.scalar.CountSubstring; | ||
| import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32; | ||
| import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32Internal; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Open all |
||
| import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateMap; | ||
| import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateNamedStruct; | ||
| import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateStruct; | ||
|
|
@@ -1560,6 +1561,10 @@ default R visitCrc32(Crc32 crc32, C context) { | |
| return visitScalarFunction(crc32, context); | ||
| } | ||
|
|
||
| default R visitCrc32Internal(Crc32Internal crc32Internal, C context) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Open all packages and passwords in text messages |
||
| return visitScalarFunction(crc32Internal, context); | ||
| } | ||
|
|
||
| default R visitLike(Like like, C context) { | ||
| return visitStringRegexPredicate(like, context); | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
118 changes: 118 additions & 0 deletions
118
regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.Executors | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| suite("check_hash_bucket_table") { | ||
|
|
||
| AtomicInteger dbNum = new AtomicInteger(0) | ||
| AtomicInteger tableNum = new AtomicInteger(0) | ||
| AtomicInteger partitionNum = new AtomicInteger(0) | ||
| def executor = Executors.newFixedThreadPool(30) | ||
| def futures = [] | ||
|
|
||
| def excludedDbs = ["mysql", "information_schema", "__internal_schema"].toSet() | ||
|
|
||
| logger.info("===== [check] begin to check hash bucket tables") | ||
| def checkPartition = { String db, String tblName, def info -> | ||
| int bucketNum = info["Buckets"].toInteger() | ||
| if (bucketNum <= 1) { return false} | ||
|
|
||
| def bucketColumns = info["DistributionKey"] | ||
| if (bucketColumns == "RANDOM") {return false} | ||
| def columnsDetail = sql_return_maparray "desc ${tblName} all;" | ||
| def bucketCols = bucketColumns.split(",").collect { it.trim() } | ||
| def bucketColsStr = bucketCols.collect { "`${it}`" }.join(",") | ||
| def partitionName = info["PartitionName"] | ||
| try { | ||
| def tabletIdList = sql_return_maparray(""" show replica status from ${tblName} partition(${partitionName}); """).collect { it.TabletId }.toList() | ||
| def tabletIds = tabletIdList.toSet() | ||
| int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() | ||
| logger.info("""===== [check] Begin to check partition: ${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum}, replica num: ${replicaNum}, bucket columns: ${bucketColsStr}""") | ||
| (0..replicaNum-1).each { replica -> | ||
| sql "set use_fix_replica=${replica};" | ||
| tabletIds.each { it2 -> | ||
| def tabletId = it2 | ||
| try { | ||
| def res = sql "select crc32_internal(${bucketColsStr}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColsStr}) % ${bucketNum};" | ||
| if (res.size() > 1) { | ||
| logger.info("""===== [check] check failed: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, bucket columns: ${bucketColsStr}, res.size()=${res.size()}, res=${res}""") | ||
| assert res.size() == 1 | ||
| } | ||
| } catch (AssertionError e) { | ||
| throw e | ||
| } catch (Throwable e) { | ||
| logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, e=${e}") | ||
| } | ||
| } | ||
| sql "set use_fix_replica=-1;" | ||
| } | ||
| logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColsStr}""") | ||
| } catch (AssertionError e) { | ||
| throw e | ||
| } catch (Throwable e) { | ||
| logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, e=${e}") | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| def checkTable = { String db, String tblName -> | ||
| sql "use ${db};" | ||
| def showStmt = sql_return_maparray("show create table ${tblName}")[0]["Create Table"] | ||
| def partitionInfo = sql_return_maparray """ show partitions from ${tblName}; """ | ||
| int checkedPartition = 0 | ||
| partitionInfo.each { | ||
| if (checkPartition(db, tblName, it)) { | ||
| ++checkedPartition | ||
| } | ||
| } | ||
| logger.info("""===== [check] Finish to check table: ${db}.${tblName}""") | ||
| partitionNum.addAndGet(checkedPartition) | ||
| return checkedPartition > 0 | ||
| } | ||
|
|
||
| def checkDb = { String db -> | ||
| sql "use ${db};" | ||
| dbNum.incrementAndGet() | ||
| def tables = sql("show full tables").stream().filter{ it[1] == "BASE TABLE" }.collect{ it[0] }.toList() | ||
| def asyncMVs = sql_return_maparray("""select * from mv_infos("database"="${db}");""").collect{ it.Name }.toSet() | ||
| tables.each { | ||
| def tblName = it | ||
| if (!asyncMVs.contains(tblName)) { | ||
| futures << executor.submit({ | ||
| if (checkTable(db, tblName)) { | ||
| tableNum.incrementAndGet() | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def allDbs = sql "show databases" | ||
| allDbs.each { | ||
| def db = it[0] | ||
| if (!excludedDbs.contains(db)) { | ||
| checkDb(db) | ||
| } | ||
| } | ||
| futures.each { it.get() } | ||
| executor.shutdown() | ||
| executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES) | ||
| logger.info("===== [check] finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}, partition num: ${partitionNum}") | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open
All social media accounts
And texts messages