Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,6 @@ DEFINE_Validator(max_send_batch_parallelism_per_job,
DEFINE_Int32(send_batch_thread_pool_thread_num, "64");
// number of send batch thread pool queue size
DEFINE_Int32(send_batch_thread_pool_queue_size, "102400");
// number of download cache thread pool size
DEFINE_Int32(download_cache_thread_pool_thread_num, "48");
// number of download cache thread pool queue size
DEFINE_Int32(download_cache_thread_pool_queue_size, "102400");
// download cache buffer size
DEFINE_Int64(download_cache_buffer_size, "10485760");

// Limit the number of segment of a newly created rowset.
// The newly created rowset may to be compacted after loading,
Expand Down
6 changes: 0 additions & 6 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -849,12 +849,6 @@ DECLARE_mInt32(max_send_batch_parallelism_per_job);
DECLARE_Int32(send_batch_thread_pool_thread_num);
// number of send batch thread pool queue size
DECLARE_Int32(send_batch_thread_pool_queue_size);
// number of download cache thread pool size
DECLARE_Int32(download_cache_thread_pool_thread_num);
// number of download cache thread pool queue size
DECLARE_Int32(download_cache_thread_pool_queue_size);
// download cache buffer size
DECLARE_Int64(download_cache_buffer_size);

// Limit the number of segment of a newly created rowset.
// The newly created rowset may to be compacted after loading,
Expand Down
26 changes: 2 additions & 24 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#pragma once

#include <common/multi_version.h>
#include <stddef.h>

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -168,7 +168,6 @@ class ExecEnv {
MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); }

ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); }
ThreadPool* buffered_reader_prefetch_thread_pool() {
return _buffered_reader_prefetch_thread_pool.get();
}
Expand All @@ -177,23 +176,8 @@ class ExecEnv {
ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); }
ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); }

void set_serial_download_cache_thread_token() {
_serial_download_cache_thread_token =
download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1);
}
ThreadPoolToken* get_serial_download_cache_thread_token() {
return _serial_download_cache_thread_token.get();
}
void init_download_cache_buf();
void init_download_cache_required_components();
Status init_pipeline_task_scheduler();
void init_file_cache_factory();
char* get_download_cache_buf(ThreadPoolToken* token) {
if (_download_cache_buf_map.find(token) == _download_cache_buf_map.end()) {
return nullptr;
}
return _download_cache_buf_map[token].get();
}
io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
UserFunctionCache* user_function_cache() { return _user_function_cache; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
Expand Down Expand Up @@ -322,23 +306,17 @@ class ExecEnv {
std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;

std::unique_ptr<ThreadPool> _send_batch_thread_pool;

// Threadpool used to download cache from remote storage
std::unique_ptr<ThreadPool> _download_cache_thread_pool;
// Threadpool used to prefetch remote file for buffered reader
std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
// Threadpool used to upload local file to s3
std::unique_ptr<ThreadPool> _s3_file_upload_thread_pool;
// A token used to submit download cache task serially
std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
// Pool used by fragment manager to send profile or status to FE coordinator
std::unique_ptr<ThreadPool> _send_report_thread_pool;
// Pool used by join node to build hash table
std::unique_ptr<ThreadPool> _join_node_thread_pool;
// Pool to use a new thread to release object
std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
// ThreadPoolToken -> buffer
std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>> _download_cache_buf_map;

FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
pipeline::TaskScheduler* _with_group_task_scheduler = nullptr;
Expand Down
53 changes: 10 additions & 43 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

// IWYU pragma: no_include <bthread/errno.h>
#include <common/multi_version.h>
#include <errno.h> // IWYU pragma: keep
#include <gen_cpp/HeartbeatService_types.h>
Comment thread
zclllyybb marked this conversation as resolved.
#include <gen_cpp/Metrics_types.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/resource.h>

#include <cerrno> // IWYU pragma: keep
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <limits>
#include <map>
#include <memory>
Expand Down Expand Up @@ -109,15 +109,13 @@ class PFunctionService_Stub;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT);

static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
bool init_system_metrics = config::enable_system_metrics;
std::set<std::string> disk_devices;
std::vector<std::string> network_interfaces;
std::vector<std::string> paths;
for (auto& store_path : store_paths) {
for (const auto& store_path : store_paths) {
paths.emplace_back(store_path.path);
}
if (init_system_metrics) {
Expand Down Expand Up @@ -167,8 +165,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool));

init_download_cache_required_components();

static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
.set_min_threads(16)
.set_max_threads(64)
Expand Down Expand Up @@ -336,9 +332,11 @@ void ExecEnv::init_file_cache_factory() {
continue;
}

olap_res = file_cache_init_pool->submit_func(std::bind(
&io::FileCacheFactory::create_file_cache, _file_cache_factory, cache_path.path,
cache_path.init_settings(), &(cache_status.emplace_back())));
olap_res = file_cache_init_pool->submit_func(
[this, capture0 = cache_path.path, capture1 = cache_path.init_settings(),
capture2 = &(cache_status.emplace_back())] {
_file_cache_factory->create_file_cache(capture0, capture1, capture2);
});

if (!olap_res.ok()) {
LOG(FATAL) << "failed to init file cache, err: " << olap_res;
Expand All @@ -355,7 +353,6 @@ void ExecEnv::init_file_cache_factory() {
}
}
}
return;
}

Status ExecEnv::_init_mem_env() {
Expand Down Expand Up @@ -488,43 +485,18 @@ void ExecEnv::init_mem_tracker() {
std::make_shared<MemTracker>("IOBufBlockMemory", _orphan_mem_tracker_raw);
}

void ExecEnv::init_download_cache_buf() {
std::unique_ptr<char[]> download_cache_buf(new char[config::download_cache_buffer_size]);
memset(download_cache_buf.get(), 0, config::download_cache_buffer_size);
_download_cache_buf_map[_serial_download_cache_thread_token.get()] =
std::move(download_cache_buf);
}

void ExecEnv::init_download_cache_required_components() {
static_cast<void>(ThreadPoolBuilder("DownloadCacheThreadPool")
.set_min_threads(1)
.set_max_threads(config::download_cache_thread_pool_thread_num)
.set_max_queue_size(config::download_cache_thread_pool_queue_size)
.build(&_download_cache_thread_pool));
set_serial_download_cache_thread_token();
init_download_cache_buf();
}

void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
[this]() { return _send_batch_thread_pool->num_threads(); });

REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
[this]() { return _send_batch_thread_pool->get_queue_size(); });

REGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num,
[this]() { return _download_cache_thread_pool->num_threads(); });

REGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size,
[this]() { return _download_cache_thread_pool->get_queue_size(); });
}

void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size);
}

// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method.
Expand Down Expand Up @@ -572,8 +544,6 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_lazy_release_obj_pool);
SAFE_SHUTDOWN(_send_report_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
SAFE_SHUTDOWN(_serial_download_cache_thread_token);
SAFE_SHUTDOWN(_download_cache_thread_pool);

// Free resource after threads are stopped.
// Some threads are still running, like threads created by _new_load_stream_mgr ...
Expand Down Expand Up @@ -645,9 +615,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_external_scan_context_mgr);
SAFE_DELETE(_user_function_cache);

_serial_download_cache_thread_token.reset(nullptr);
_download_cache_thread_pool.reset(nullptr);

// _heartbeat_flags must be destoried after staroge engine
SAFE_DELETE(_heartbeat_flags);

Expand Down
2 changes: 0 additions & 2 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ class DorisMetrics {
UIntGauge* add_batch_task_queue_size = nullptr;
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
UIntGauge* download_cache_thread_pool_thread_num = nullptr;
UIntGauge* download_cache_thread_pool_queue_size = nullptr;
UIntGauge* fragment_thread_pool_queue_size = nullptr;

// Upload metrics
Expand Down
6 changes: 0 additions & 6 deletions docs/en/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -975,12 +975,6 @@ BaseCompaction:546859:
* Description: Interval in milliseconds between memtable flush mgr refresh iterations
* Default value: 100

#### `download_cache_buffer_size`

* Type: int64
* Description: The size of the buffer used to receive data when downloading the cache.
* Default value: 10485760

#### `zone_map_row_num_threshold`

* Type: int32
Expand Down
6 changes: 0 additions & 6 deletions docs/zh-CN/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1000,12 +1000,6 @@ BaseCompaction:546859:
* 描述:memtable主动下刷时刷新内存统计的周期(毫秒)
* 默认值:100

#### `download_cache_buffer_size`

* 类型: int64
* 描述: 下载缓存时用于接收数据的buffer的大小。
* 默认值: 10485760

#### `zone_map_row_num_threshold`

* 类型: int32
Expand Down