Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2363,6 +2363,11 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,

size_t joins_count = 0;
bool is_full_join = false;
bool is_global_join = false;
bool is_right_join_with_remote_table = false;
int first_join_pos = -1;
int last_right_join_pos = -1;
bool is_cross_join = false;
/// For each table, table function, query, union table expressions prepare before query plan build
for (size_t i = 0; i < table_expressions_stack_size; ++i)
{
Expand All @@ -2374,26 +2379,66 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
if (table_expression_type == QueryTreeNodeType::CROSS_JOIN)
{
joins_count += table_expression->as<const CrossJoinNode &>().getTableExpressions().size() - 1;
is_cross_join = true;
continue;
}

if (table_expression_type == QueryTreeNodeType::JOIN)
{
++joins_count;
const auto & join_node = table_expression->as<const JoinNode &>();
if (join_node.getKind() == JoinKind::Full)
const auto join_kind = join_node.getKind();

if (join_kind == JoinKind::Full)
is_full_join = true;

if (join_node.getLocality() == JoinLocality::Global)
is_global_join = true;

// save join positions for later check
if (first_join_pos < 0 && (join_kind == JoinKind::Left || join_kind == JoinKind::Inner || join_kind == JoinKind::Right))
first_join_pos = static_cast<int>(i);
if (join_kind == JoinKind::Right)
last_right_join_pos = static_cast<int>(i);

/// For RIGHT JOIN with a distributed table on the right side, disable parallel replicas.
/// The distributed table on the right side would be wrapped into a subquery,
/// causing parallel replicas to incorrectly choose the left table for parallel reading.
/// Each replica would then independently read the full distributed table, resulting in duplicate data.
if (join_kind == JoinKind::Right)
{
const auto & right_expression_data = planner_context->getTableExpressionDataOrThrow(join_node.getRightTableExpression());
is_right_join_with_remote_table = right_expression_data.isRemote();
}

continue;
}

prepareBuildQueryPlanForTableExpression(table_expression, planner_context);
}

/// disable parallel replicas for n-way join with FULL JOIN involved
if (joins_count > 1 && is_full_join)
auto should_disable_parallel_replicas = [&]() -> bool
{
/// n-way join like LEFT/INNER/RIGHT ... RIGHT ...
/// if last RIGHT join position is after LEFT/INNER/RIGHT(another) join then the left side of the RIGHT join can't be parallelized
if (first_join_pos >= 0 && last_right_join_pos >= 0 && first_join_pos < last_right_join_pos)
return true;

/// for n-way join with FULL JOIN or GLOBAL JOINS or CROSS JOIN
if (joins_count > 1 && (is_full_join || is_global_join || is_cross_join))
return true;

/// For RIGHT JOIN with distributed table on the right side
if (is_right_join_with_remote_table)
return true;

return false;
};

if (should_disable_parallel_replicas())
planner_context->getMutableQueryContext()->setSetting("enable_parallel_replicas", Field{0});


// in case of n-way JOINs the table expression stack contains several join nodes
// so, we need to find right parent node for a table expression to pass into buildQueryPlanForTableExpression()
QueryTreeNodePtr parent_join_tree = join_tree_node;
Expand Down
5 changes: 1 addition & 4 deletions tests/parallel_replicas_blacklist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
Duplicates on JOIN + CROSS JOIN
01852_multiple_joins_with_union_join

https://github.com/ClickHouse/ClickHouse/issues/74341
Duplicates on RIGHT JOIN with Values table
03006_join_on_inequal_expression_3.gen

https://github.com/ClickHouse/ClickHouse/issues/74343
Cannot find column `tuple(col_a, type)` in source stream, there are only columns: [__table1.id, tuple(replaceAll(__table1.data, 'a'_String, 'e'_String), __table1.type), tuple(replaceAll(__table1.data, 'a'_String, 'e'_String), __table1.type)]. (THERE_IS_NO_COLUMN) (in query: SELECT id, tuple(replaceAll(data, 'a', 'e') AS col_a, type) AS a, tuple(replaceAll(data, 'a', 'e') AS col_b, type) AS b FROM src;)
03240_insert_select_named_tuple
Expand All @@ -47,6 +43,7 @@
02833_tuple_concat
02378_analyzer_projection_names


https://github.com/ClickHouse/ClickHouse/issues/74367
Cannot find column in source stream
02457_morton_coding_with_mask
Expand Down
5 changes: 5 additions & 0 deletions tests/queries/0_stateless/03611_pr_global_join.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1 1 1 1 1 1
---
1 1 1 1 1 1
---
1 1 1 1
30 changes: 30 additions & 0 deletions tests/queries/0_stateless/03611_pr_global_join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (c0 Int NULL, c1 Int) ENGINE = MergeTree() ORDER BY tuple();
CREATE TABLE t2 (c0 Int NULL, c1 Int) ENGINE = MergeTree() ORDER BY tuple();
INSERT INTO TABLE t1 (c1, c0) VALUES (1, 1);
INSERT INTO TABLE t2 (c0, c1) VALUES (1, 1);

SET enable_parallel_replicas = 1;
SET max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree = 1;

SELECT *
FROM t2
INNER JOIN t2 AS tx ON true
RIGHT JOIN t1 ON true;

SELECT '---';

-- this query was problematic, now GLOBAL JOINs has been disabled in n-way JOINs
SELECT *
FROM t2
INNER JOIN t2 AS tx ON true
GLOBAL RIGHT JOIN t1 ON true;

SELECT '---';
-- just check that simple GLOBAL JOIN works with parallel replicas
SELECT *
FROM t2 GLOBAL RIGHT JOIN t1 ON true;

DROP TABLE t1;
DROP TABLE t2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- no parallel replicas --
\N \N \N 0 3 B
1 1 1 a 1 A 1 A
2 2 2 b 2 B 2 B
-- parallel replicas --
\N \N \N 0 3 B
1 1 1 a 1 A 1 A
2 2 2 b 2 B 2 B
-- explain: check parallel replicas is disabled, looking at ReadFromRemoteParallelReplicas steps count --
0
48 changes: 48 additions & 0 deletions tests/queries/0_stateless/03624_pr_lefl_right_joins_chain.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
DROP TABLE IF EXISTS tab;
CREATE TABLE tab ( `k` Nullable(UInt32), `k1` Nullable(UInt32), `k2` Nullable(UInt32), `v` String ) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO tab VALUES (1, 1, 1, 'a'), (2, 2, 2, 'b');

DROP TABLE IF EXISTS mem;
CREATE TABLE mem ( `k` UInt64, `v` String ) ENGINE = Join(ANY, LEFT, k);
INSERT INTO mem VALUES (1, 'A'), (2, 'B'), (3, 'B');

DROP TABLE IF EXISTS mem2;
CREATE TABLE mem2 ( `k` UInt64, `v` String ) ENGINE = Join(ANY, RIGHT, k);
INSERT INTO mem2 VALUES (1, 'A'), (2, 'B'), (3, 'B');

SET enable_analyzer = 1;

SELECT '-- no parallel replicas --';
SELECT *
FROM tab
ANY LEFT JOIN mem ON k1 = mem.k
ANY RIGHT JOIN mem2 ON k2 = mem2.k
ORDER BY tab.v
SETTINGS enable_parallel_replicas=0;

SELECT '-- parallel replicas --';
SELECT *
FROM tab
ANY LEFT JOIN mem ON k1 = mem.k
ANY RIGHT JOIN mem2 ON k2 = mem2.k
ORDER BY tab.v
SETTINGS enable_parallel_replicas=1, max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree=1;


SELECT '-- explain: check parallel replicas is disabled, looking at ReadFromRemoteParallelReplicas steps count --';
SELECT count()
FROM
(
EXPLAIN
SELECT *
FROM tab
ANY LEFT JOIN mem ON k1 = mem.k
ANY RIGHT JOIN mem2 ON k2 = mem2.k
ORDER BY tab.v ASC
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree = 1
)
WHERE explain ILIKE '%ReadFromRemoteParallelReplicas%';

DROP TABLE mem2;
DROP TABLE mem;
DROP TABLE tab;
Loading