Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ std::string PartialUpdateInfo::summary() const {
update_cids.size(), missing_cids.size(), is_strict_mode, max_version_in_flush_phase);
}

Status PartialUpdateInfo::handle_non_strict_mode_not_found_error(
Comment thread
bobhan1 marked this conversation as resolved.
const TabletSchema& tablet_schema) {
if (!can_insert_new_rows_in_partial_update) {
Comment thread
bobhan1 marked this conversation as resolved.
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (!col.has_default_value() && !col.is_nullable() &&
!(tablet_schema.auto_increment_column() == col.name())) {
error_column = col.name();
break;
}
}
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be nullable "
"for "
"newly inserted rows in non-strict mode partial update",
error_column);
}
return Status::OK();
}

void PartialUpdateInfo::_generate_default_values_for_missing_cids(
const TabletSchema& tablet_schema) {
for (unsigned int cur_cid : missing_cids) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct PartialUpdateInfo {
const std::string& auto_increment_column, int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema);
std::string summary() const;

private:
Expand Down Expand Up @@ -93,4 +94,10 @@ class PartialUpdateReadPlan {
std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>> plan;
};

struct PartialUpdateStats {
int64_t num_rows_updated {0};
int64_t num_rows_new_added {0};
int64_t num_rows_deleted {0};
int64_t num_rows_filtered {0};
};
} // namespace doris
186 changes: 78 additions & 108 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,70 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
<< watch.elapsed_time() / 1000;
}

Status SegmentWriter::probe_key_for_mow(
Comment thread
bobhan1 marked this conversation as resolved.
std::string key, std::size_t segment_pos, bool have_input_seq_column, bool have_delete_sign,
PartialUpdateReadPlan& read_plan, const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats) {
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column,
specified_rowsets, &loc, _mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++stats.num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
} else if (!have_delete_sign) {
RETURN_IF_ERROR(
_opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
*_tablet_schema));
}
++stats.num_rows_new_added;
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
return Status::OK();
}
if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
LOG(WARNING) << "failed to lookup row key, error: " << st;
return st;
}

// 1. if the delete sign is marked, it means that the value columns of the row will not
// be read. So we don't need to read the missing values from the previous rows.
// 2. the one exception is when there are sequence columns in the table, we need to read
// the sequence columns, otherwise it may cause the merge-on-read based compaction
// policy to produce incorrect results
if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
} else {
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
read_plan.prepare_to_read(loc, segment_pos);
}

if (st.is<KEY_ALREADY_EXISTS>()) {
// although we need to mark delete current row, we still need to read missing columns
// for this row, we need to ensure that each column is aligned
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
++stats.num_rows_deleted;
} else {
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
++stats.num_rows_updated;
}
return Status::OK();
}

// for partial update, we should do following steps to fill content of block:
// 1. set block data to data convertor, and get all key_column's converted slice
// 2. get pk of input block, and read missing columns
Expand All @@ -482,6 +546,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
DCHECK(_is_mow());

DCHECK(_opts.rowset_ctx->partial_update_info);
DCHECK(row_pos == 0);

// find missing column cids
const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
const auto& including_cids = _opts.rowset_ctx->partial_update_info->update_cids;
Expand Down Expand Up @@ -526,35 +592,13 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
const auto* delete_sign_column_data =
BaseTablet::get_delete_sign_column_data(full_block, row_pos + num_rows);

std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock rlock(_tablet->get_header_lock());
specified_rowsets = _mow_context->rowset_ptrs;
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase)
LOG(WARNING) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in "
"partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
}
}
}
const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());

PartialUpdateReadPlan read_plan;

// locate rows in base data
int64_t num_rows_updated = 0;
int64_t num_rows_new_added = 0;
int64_t num_rows_deleted = 0;
int64_t num_rows_filtered = 0;
PartialUpdateStats stats;

for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
// block segment
Expand All @@ -581,76 +625,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
bool have_delete_sign =
(delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0);

RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column,
specified_rowsets, &loc, _mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);

} else {
if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update &&
!have_delete_sign) {
std::string error_column;
for (auto cid : _opts.rowset_ctx->partial_update_info->missing_cids) {
const TabletColumn& col = _tablet_schema->column(cid);
if (!col.has_default_value() && !col.is_nullable() &&
_tablet_schema->auto_increment_column() != col.name()) {
error_column = col.name();
break;
}
}
return Status::Error<INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be nullable "
"for "
"newly inserted rows in non-strict mode partial update",
error_column);
}
}
++num_rows_new_added;
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
continue;
}
if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
LOG(WARNING) << "failed to lookup row key, error: " << st;
return st;
}

// 1. if the delete sign is marked, it means that the value columns of the row will not
// be read. So we don't need to read the missing values from the previous rows.
// 2. the one exception is when there is sequence column in the table, we need to read
// the sequence columns, otherwise it may cause the merge-on-read based compaction
// policy to produce incorrect results
if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
} else {
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
read_plan.prepare_to_read(loc, segment_pos);
}

if (st.is<KEY_ALREADY_EXISTS>()) {
// although we need to mark delete current row, we still need to read missing columns
// for this row, we need to ensure that each column is aligned
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
++num_rows_deleted;
} else {
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
++num_rows_updated;
}
RETURN_IF_ERROR(probe_key_for_mow(key, segment_pos, have_input_seq_column, have_delete_sign,
read_plan, specified_rowsets, segment_caches,
has_default_or_nullable, use_default_or_null_flag,
stats));
}
CHECK_EQ(use_default_or_null_flag.size(), num_rows);

Expand Down Expand Up @@ -684,29 +662,21 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
converted_result.second->get_data(),
num_rows));
}
_num_rows_updated += num_rows_updated;
_num_rows_deleted += num_rows_deleted;
_num_rows_new_added += num_rows_new_added;
_num_rows_filtered += num_rows_filtered;
_num_rows_updated += stats.num_rows_updated;
_num_rows_deleted += stats.num_rows_deleted;
_num_rows_new_added += stats.num_rows_new_added;
_num_rows_filtered += stats.num_rows_filtered;
if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
DCHECK_NE(seq_column, nullptr);
DCHECK_EQ(_num_rows_written, row_pos)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removed these DCHECKs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhannngchen These DCHECKs is checked explicitly below

<< "_num_rows_written: " << _num_rows_written << ", row_pos" << row_pos;
DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
<< "primary key index builder num rows(" << _primary_key_index_builder->num_rows()
<< ") not equal to segment writer's num rows written(" << _num_rows_written << ")";
if (_num_rows_written != row_pos ||
_primary_key_index_builder->num_rows() != _num_rows_written) {
return Status::InternalError(
"Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key "
"index builder num rows: {}",
_num_rows_written, row_pos, _primary_key_index_builder->num_rows());
}
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
std::string key = _full_encode_keys(key_columns, block_pos - row_pos);
_encode_seq_column(seq_column, block_pos - row_pos, &key);
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
}
RETURN_IF_ERROR(
_generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false));
}

_num_rows_written += num_rows;
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ class SegmentWriter {
Status append_row(const RowType& row);

Status append_block(const vectorized::Block* block, size_t row_pos, size_t num_rows);
Status probe_key_for_mow(std::string key, std::size_t segment_pos, bool have_input_seq_column,
bool have_delete_sign, PartialUpdateReadPlan& read_plan,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable,
std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats);
Status append_block_with_partial_content(const vectorized::Block* block, size_t row_pos,
size_t num_rows);
Status append_block_with_variant_subcolumns(vectorized::Block& data);
Expand Down
Loading