Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ DeltaWriter::DeltaWriter(StorageEngine& engine, WriteRequest* req, RuntimeProfil
void BaseDeltaWriter::_init_profile(RuntimeProfile* profile) {
_profile = profile->create_child(fmt::format("DeltaWriter {}", _req.tablet_id), true, true);
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
_wait_flush_limit_timer = ADD_TIMER(_profile, "WaitFlushLimitTime");
}

void DeltaWriter::_init_profile(RuntimeProfile* profile) {
Expand Down Expand Up @@ -126,8 +127,12 @@ Status BaseDeltaWriter::write(const vectorized::Block* block, const std::vector<
if (!_is_init && !_is_cancelled) {
RETURN_IF_ERROR(init());
}
while (_memtable_writer->flush_running_count() >= config::memtable_flush_running_count_limit) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
{
SCOPED_TIMER(_wait_flush_limit_timer);
while (_memtable_writer->flush_running_count() >=
config::memtable_flush_running_count_limit) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
return _memtable_writer->write(block, row_idxs, is_append);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class BaseDeltaWriter {

RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
RuntimeProfile::Counter* _wait_flush_limit_timer = nullptr;

MonotonicStopWatch _lock_watch;
};
Expand Down
10 changes: 8 additions & 2 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
void DeltaWriterV2::_update_profile(RuntimeProfile* profile) {
auto child = profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, true);
auto write_memtable_timer = ADD_TIMER(child, "WriteMemTableTime");
auto wait_flush_limit_timer = ADD_TIMER(child, "WaitFlushLimitTime");
auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
COUNTER_SET(write_memtable_timer, _write_memtable_time);
COUNTER_SET(wait_flush_limit_timer, _wait_flush_limit_time);
COUNTER_SET(close_wait_timer, _close_wait_time);
}

Expand Down Expand Up @@ -152,8 +154,12 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<ui
if (!_is_init && !_is_cancelled) {
RETURN_IF_ERROR(init());
}
while (_memtable_writer->flush_running_count() >= config::memtable_flush_running_count_limit) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
{
SCOPED_RAW_TIMER(&_wait_flush_limit_time);
while (_memtable_writer->flush_running_count() >=
config::memtable_flush_running_count_limit) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
SCOPED_RAW_TIMER(&_write_memtable_time);
return _memtable_writer->write(block, row_idxs, is_append);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class DeltaWriterV2 {
int64_t _total_received_rows = 0;

int64_t _write_memtable_time = 0;
int64_t _wait_flush_limit_time = 0;
int64_t _close_wait_time = 0;

std::shared_ptr<MemTableWriter> _memtable_writer;
Expand Down