Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*

// 1. if the delete sign is marked, it means that the value columns of the row will not
// be read. So we don't need to read the missing values from the previous rows.
// 2. the one exception is when there are sequence columns in the table, we need to read
// 2. the one exception is when there is sequence column in the table, we need to read
// the sequence columns, otherwise it may cause the merge-on-read based compaction
// policy to produce incorrect results
if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
Expand Down Expand Up @@ -642,9 +642,9 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
auto rowset = _rsid_to_rowset[rs_it.first];
CHECK(rowset);
std::vector<uint32_t> rids;
for (auto id_and_pos : seg_it.second) {
rids.emplace_back(id_and_pos.rid);
read_index[id_and_pos.pos] = read_idx++;
for (auto [rid, pos] : seg_it.second) {
rids.emplace_back(rid);
read_index[pos] = read_idx++;
}
if (has_row_column) {
auto st = tablet->fetch_value_through_row_column(
Expand Down Expand Up @@ -698,7 +698,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f

// fill all missing value from mutable_old_columns, need to consider default value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
// `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row
// `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row
// for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column
// marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will
// be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da

// 1. if the delete sign is marked, it means that the value columns of the row will not
// be read. So we don't need to read the missing values from the previous rows.
// 2. the one exception is when there are sequence columns in the table, we need to read
// 2. the one exception is when there is sequence column in the table, we need to read
// the sequence columns, otherwise it may cause the merge-on-read based compaction
// policy to produce incorrect results
if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
Expand Down Expand Up @@ -582,9 +582,9 @@ Status VerticalSegmentWriter::_fill_missing_columns(
auto rowset = _rsid_to_rowset[rs_it.first];
CHECK(rowset);
std::vector<uint32_t> rids;
for (auto id_and_pos : seg_it.second) {
rids.emplace_back(id_and_pos.rid);
read_index[id_and_pos.pos] = read_idx++;
for (auto [rid, pos] : seg_it.second) {
rids.emplace_back(rid);
read_index[pos] = read_idx++;
}
if (has_row_column) {
auto st = tablet->fetch_value_through_row_column(
Expand Down Expand Up @@ -636,7 +636,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(

// fill all missing value from mutable_old_columns, need to consider default value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
// `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row
// `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row
// for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column
// marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will
// be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not
Expand Down
108 changes: 69 additions & 39 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3235,27 +3235,56 @@ Status Tablet::generate_new_block_for_partial_update(
auto old_block = rowset_schema->create_block_by_cids(missing_cids);
auto update_block = rowset_schema->create_block_by_cids(update_cids);

std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset,
old_block, &read_index_old));
auto get_delete_sign_column_data = [](vectorized::Block& block,
size_t rows) -> const signed char* {
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
const auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
if (delete_sign_col.size() >= rows) {
return delete_sign_col.get_data().data();
}
}
return nullptr;
};

// rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block
std::map<uint32_t, uint32_t> read_index_update;

// read current rowset first, if a row in the current rowset has delete sign mark
// we don't need to read values from old block
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update,
rsid_to_rowset, update_block, &read_index_update));

const vectorized::Int8* delete_sign_column_data = nullptr;
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
old_block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
delete_sign_column_data = delete_sign_col.get_data().data();
size_t update_rows = read_index_update.size();
for (auto i = 0; i < update_cids.size(); ++i) {
for (auto idx = 0; idx < update_rows; ++idx) {
full_mutable_columns[update_cids[i]]->insert_from(
*update_block.get_columns_with_type_and_name()[i].column.get(),
read_index_update[idx]);
}
}

// if there is sequence column in the table, we need to read the sequence column,
// otherwise it may cause the merge-on-read based compaction policy to produce incorrect results
const auto* __restrict new_block_delete_signs =
rowset_schema->has_sequence_col()
? nullptr
: get_delete_sign_column_data(update_block, update_rows);

// rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset,
old_block, &read_index_old, new_block_delete_signs));
size_t old_rows = read_index_old.size();
const auto* __restrict old_block_delete_signs =
get_delete_sign_column_data(old_block, old_rows);

// build default value block
auto default_value_block = old_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();
if (delete_sign_column_data != nullptr) {
if (old_block_delete_signs != nullptr || new_block_delete_signs != nullptr) {
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& column = rowset_schema->column(missing_cids[i]);
if (column.has_default_value()) {
Expand All @@ -3268,22 +3297,26 @@ Status Tablet::generate_new_block_for_partial_update(
}
}

// build full block
CHECK(read_index_old.size() == read_index_update.size());
CHECK(update_rows >= old_rows);

// build full block
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& rs_column = rowset_schema->column(missing_cids[i]);
for (auto idx = 0; idx < read_index_old.size(); ++idx) {
// if the conflict update is a delete sign, which means that the key is
// not exist now, we should not read old values from the deleted data,
// and should use default value instead.
// NOTE: since now we are in the publishing phase, all data is commited
// before, even the `strict_mode` is true (which requires partial update
// load job can't insert new keys), this "new" key MUST be written into
// the new generated segment file.
if (delete_sign_column_data != nullptr &&
delete_sign_column_data[read_index_old[idx]] != 0) {
auto& mutable_column = full_mutable_columns[missing_cids[i]];
auto& mutable_column = full_mutable_columns[missing_cids[i]];
for (auto idx = 0; idx < update_rows; ++idx) {
// There are two cases we don't need to read values from old data:
// 1. if the conflicting new row's delete sign is marked, which means the value columns
// of the row will not be read. So we don't need to read the missing values from the previous rows.
// 2. if the conflicting old row's delete sign is marked, which means that the key is not exist now,
// we should not read old values from the deleted data, and should use default value instead.
// NOTE: since now we are in the publishing phase, all data is commited
// before, even the `strict_mode` is true (which requires partial update
// load job can't insert new keys), this "new" key MUST be written into
// the new generated segment file.
if (new_block_delete_signs != nullptr && new_block_delete_signs[idx]) {
mutable_column->insert_default();
} else if (old_block_delete_signs != nullptr &&
old_block_delete_signs[read_index_old[idx]] != 0) {
if (rs_column.has_default_value()) {
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
} else if (rs_column.is_nullable()) {
Expand All @@ -3292,18 +3325,11 @@ Status Tablet::generate_new_block_for_partial_update(
} else {
mutable_column->insert_default();
}
continue;
} else {
mutable_column->insert_from(
*old_block.get_columns_with_type_and_name()[i].column.get(),
read_index_old[idx]);
}
full_mutable_columns[missing_cids[i]]->insert_from(
*old_block.get_columns_with_type_and_name()[i].column.get(),
read_index_old[idx]);
}
}
for (auto i = 0; i < update_cids.size(); ++i) {
for (auto idx = 0; idx < read_index_update.size(); ++idx) {
full_mutable_columns[update_cids[i]]->insert_from(
*update_block.get_columns_with_type_and_name()[i].column.get(),
read_index_update[idx]);
}
}
output_block->set_columns(std::move(full_mutable_columns));
Expand All @@ -3318,7 +3344,8 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema,
const PartialUpdateReadPlan& read_plan,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block,
std::map<uint32_t, uint32_t>* read_index) {
std::map<uint32_t, uint32_t>* read_index,
const signed char* __restrict skip_map) {
bool has_row_column = tablet_schema->store_row_column();
auto mutable_columns = block.mutate_columns();
size_t read_idx = 0;
Expand All @@ -3327,9 +3354,12 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema,
auto rowset_iter = rsid_to_rowset.find(rs_it.first);
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
for (auto id_and_pos : seg_it.second) {
rids.emplace_back(id_and_pos.rid);
(*read_index)[id_and_pos.pos] = read_idx++;
for (auto [rid, pos] : seg_it.second) {
if (skip_map && skip_map[pos]) {
continue;
}
rids.emplace_back(rid);
(*read_index)[pos] = read_idx++;
}
if (has_row_column) {
auto st = fetch_value_through_row_column(rowset_iter->second, *tablet_schema,
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ class Tablet final : public BaseTablet {
const std::vector<uint32_t> cids_to_read,
const PartialUpdateReadPlan& read_plan,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index);
vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index,
const signed char* __restrict skip_map = nullptr);
void prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan);
Status generate_new_block_for_partial_update(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 1 1
3 777 777 3 3

-- !sql --
1 1 1 1 1 0 2
1 999 999 1 1 0 3
2 \N \N \N \N 1 4
2 2 2 2 2 0 2
2 888 888 2 2 0 3
3 3 3 3 3 0 2
3 777 777 3 3 0 3

Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.junit.Assert
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite("test_delete_publish_skip_read", "nonConcurrent") {

def table1 = "test_delete_publish_skip_read"
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_mow_light_delete" = "false",
"disable_auto_compaction" = "true",
"replication_num" = "1"); """

sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
sql "sync;"
order_qt_sql "select * from ${table1};"


def enable_publish_spin_wait = {
if (isCloudMode()) {
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
} else {
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
}
}

def disable_publish_spin_wait = {
if (isCloudMode()) {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
} else {
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
}
}

def enable_block_in_publish = {
if (isCloudMode()) {
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
} else {
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
}
}

def disable_block_in_publish = {
if (isCloudMode()) {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
} else {
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
}
}

try {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()

// block the partial update in publish phase
enable_publish_spin_wait()
enable_block_in_publish()
def t1 = Thread.start {
sql "set enable_unique_key_partial_update=true;"
sql "sync;"
sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);"
}

Thread.sleep(500)

def t2 = Thread.start {
sql "insert into ${table1}(k1,__DORIS_DELETE_SIGN__) values(2,1);"
}


// let the partial update load publish
disable_block_in_publish()
t1.join()
t2.join()

order_qt_sql "select * from ${table1};"

sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
sql "set show_hidden_columns=true;"
sql "sync;"

order_qt_sql "select * from ${table1};"



} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
}

sql "DROP TABLE IF EXISTS ${table1};"
}