diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 96cee39c4069..f1ed61aa3845 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -2363,6 +2363,11 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node, size_t joins_count = 0; bool is_full_join = false; + bool is_global_join = false; + bool is_right_join_with_remote_table = false; + int first_join_pos = -1; + int last_right_join_pos = -1; + bool is_cross_join = false; /// For each table, table function, query, union table expressions prepare before query plan build for (size_t i = 0; i < table_expressions_stack_size; ++i) { @@ -2374,6 +2379,7 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node, if (table_expression_type == QueryTreeNodeType::CROSS_JOIN) { joins_count += table_expression->as().getTableExpressions().size() - 1; + is_cross_join = true; continue; } @@ -2381,19 +2387,58 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node, { ++joins_count; const auto & join_node = table_expression->as(); - if (join_node.getKind() == JoinKind::Full) + const auto join_kind = join_node.getKind(); + + if (join_kind == JoinKind::Full) is_full_join = true; + if (join_node.getLocality() == JoinLocality::Global) + is_global_join = true; + + // save join positions for later check + if (first_join_pos < 0 && (join_kind == JoinKind::Left || join_kind == JoinKind::Inner || join_kind == JoinKind::Right)) + first_join_pos = static_cast(i); + if (join_kind == JoinKind::Right) + last_right_join_pos = static_cast(i); + + /// For RIGHT JOIN with a distributed table on the right side, disable parallel replicas. + /// The distributed table on the right side would be wrapped into a subquery, + /// causing parallel replicas to incorrectly choose the left table for parallel reading. + /// Each replica would then independently read the full distributed table, resulting in duplicate data. + if (join_kind == JoinKind::Right) + { + const auto & right_expression_data = planner_context->getTableExpressionDataOrThrow(join_node.getRightTableExpression()); + is_right_join_with_remote_table = right_expression_data.isRemote(); + } + continue; } prepareBuildQueryPlanForTableExpression(table_expression, planner_context); } - /// disable parallel replicas for n-way join with FULL JOIN involved - if (joins_count > 1 && is_full_join) + auto should_disable_parallel_replicas = [&]() -> bool + { + /// n-way join like LEFT/INNER/RIGHT ... RIGHT ... + /// if last RIGHT join position is after LEFT/INNER/RIGHT(another) join then the left side of the RIGHT join can't be parallelized + if (first_join_pos >= 0 && last_right_join_pos >= 0 && first_join_pos < last_right_join_pos) + return true; + + /// for n-way join with FULL JOIN or GLOBAL JOINS or CROSS JOIN + if (joins_count > 1 && (is_full_join || is_global_join || is_cross_join)) + return true; + + /// For RIGHT JOIN with distributed table on the right side + if (is_right_join_with_remote_table) + return true; + + return false; + }; + + if (should_disable_parallel_replicas()) planner_context->getMutableQueryContext()->setSetting("enable_parallel_replicas", Field{0}); + // in case of n-way JOINs the table expression stack contains several join nodes // so, we need to find right parent node for a table expression to pass into buildQueryPlanForTableExpression() QueryTreeNodePtr parent_join_tree = join_tree_node; diff --git a/tests/parallel_replicas_blacklist.txt b/tests/parallel_replicas_blacklist.txt index 34de96df9ff3..666cb12a3af9 100644 --- a/tests/parallel_replicas_blacklist.txt +++ b/tests/parallel_replicas_blacklist.txt @@ -32,10 +32,6 @@ Duplicates on JOIN + CROSS JOIN 01852_multiple_joins_with_union_join - https://github.com/ClickHouse/ClickHouse/issues/74341 - Duplicates on RIGHT JOIN with Values table -03006_join_on_inequal_expression_3.gen - https://github.com/ClickHouse/ClickHouse/issues/74343 Cannot find column `tuple(col_a, type)` in source stream, there are only columns: [__table1.id, tuple(replaceAll(__table1.data, 'a'_String, 'e'_String), __table1.type), tuple(replaceAll(__table1.data, 'a'_String, 'e'_String), __table1.type)]. (THERE_IS_NO_COLUMN) (in query: SELECT id, tuple(replaceAll(data, 'a', 'e') AS col_a, type) AS a, tuple(replaceAll(data, 'a', 'e') AS col_b, type) AS b FROM src;) 03240_insert_select_named_tuple @@ -47,6 +43,7 @@ 02833_tuple_concat 02378_analyzer_projection_names + https://github.com/ClickHouse/ClickHouse/issues/74367 Cannot find column in source stream 02457_morton_coding_with_mask diff --git a/tests/queries/0_stateless/03611_pr_global_join.reference b/tests/queries/0_stateless/03611_pr_global_join.reference new file mode 100644 index 000000000000..a3033d72852a --- /dev/null +++ b/tests/queries/0_stateless/03611_pr_global_join.reference @@ -0,0 +1,5 @@ +1 1 1 1 1 1 +--- +1 1 1 1 1 1 +--- +1 1 1 1 diff --git a/tests/queries/0_stateless/03611_pr_global_join.sql b/tests/queries/0_stateless/03611_pr_global_join.sql new file mode 100644 index 000000000000..1e14ebab0b5a --- /dev/null +++ b/tests/queries/0_stateless/03611_pr_global_join.sql @@ -0,0 +1,30 @@ +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1 (c0 Int NULL, c1 Int) ENGINE = MergeTree() ORDER BY tuple(); +CREATE TABLE t2 (c0 Int NULL, c1 Int) ENGINE = MergeTree() ORDER BY tuple(); +INSERT INTO TABLE t1 (c1, c0) VALUES (1, 1); +INSERT INTO TABLE t2 (c0, c1) VALUES (1, 1); + +SET enable_parallel_replicas = 1; +SET max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree = 1; + +SELECT * +FROM t2 +INNER JOIN t2 AS tx ON true +RIGHT JOIN t1 ON true; + +SELECT '---'; + +-- this query was problematic, now GLOBAL JOINs has been disabled in n-way JOINs +SELECT * +FROM t2 +INNER JOIN t2 AS tx ON true +GLOBAL RIGHT JOIN t1 ON true; + +SELECT '---'; +-- just check that simple GLOBAL JOIN works with parallel replicas +SELECT * +FROM t2 GLOBAL RIGHT JOIN t1 ON true; + +DROP TABLE t1; +DROP TABLE t2; diff --git a/tests/queries/0_stateless/03624_pr_lefl_right_joins_chain.reference b/tests/queries/0_stateless/03624_pr_lefl_right_joins_chain.reference new file mode 100644 index 000000000000..1a17d42b3e18 --- /dev/null +++ b/tests/queries/0_stateless/03624_pr_lefl_right_joins_chain.reference @@ -0,0 +1,10 @@ +-- no parallel replicas -- +\N \N \N 0 3 B +1 1 1 a 1 A 1 A +2 2 2 b 2 B 2 B +-- parallel replicas -- +\N \N \N 0 3 B +1 1 1 a 1 A 1 A +2 2 2 b 2 B 2 B +-- explain: check parallel replicas is disabled, looking at ReadFromRemoteParallelReplicas steps count -- +0 diff --git a/tests/queries/0_stateless/03624_pr_lefl_right_joins_chain.sql b/tests/queries/0_stateless/03624_pr_lefl_right_joins_chain.sql new file mode 100644 index 000000000000..1c74065f5618 --- /dev/null +++ b/tests/queries/0_stateless/03624_pr_lefl_right_joins_chain.sql @@ -0,0 +1,48 @@ +DROP TABLE IF EXISTS tab; +CREATE TABLE tab ( `k` Nullable(UInt32), `k1` Nullable(UInt32), `k2` Nullable(UInt32), `v` String ) ENGINE = MergeTree ORDER BY tuple(); +INSERT INTO tab VALUES (1, 1, 1, 'a'), (2, 2, 2, 'b'); + +DROP TABLE IF EXISTS mem; +CREATE TABLE mem ( `k` UInt64, `v` String ) ENGINE = Join(ANY, LEFT, k); +INSERT INTO mem VALUES (1, 'A'), (2, 'B'), (3, 'B'); + +DROP TABLE IF EXISTS mem2; +CREATE TABLE mem2 ( `k` UInt64, `v` String ) ENGINE = Join(ANY, RIGHT, k); +INSERT INTO mem2 VALUES (1, 'A'), (2, 'B'), (3, 'B'); + +SET enable_analyzer = 1; + +SELECT '-- no parallel replicas --'; +SELECT * +FROM tab +ANY LEFT JOIN mem ON k1 = mem.k +ANY RIGHT JOIN mem2 ON k2 = mem2.k +ORDER BY tab.v +SETTINGS enable_parallel_replicas=0; + +SELECT '-- parallel replicas --'; +SELECT * +FROM tab +ANY LEFT JOIN mem ON k1 = mem.k +ANY RIGHT JOIN mem2 ON k2 = mem2.k +ORDER BY tab.v +SETTINGS enable_parallel_replicas=1, max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree=1; + + +SELECT '-- explain: check parallel replicas is disabled, looking at ReadFromRemoteParallelReplicas steps count --'; +SELECT count() +FROM +( + EXPLAIN + SELECT * + FROM tab + ANY LEFT JOIN mem ON k1 = mem.k + ANY RIGHT JOIN mem2 ON k2 = mem2.k + ORDER BY tab.v ASC + SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree = 1 +) +WHERE explain ILIKE '%ReadFromRemoteParallelReplicas%'; + +DROP TABLE mem2; +DROP TABLE mem; +DROP TABLE tab;