Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
354 changes: 255 additions & 99 deletions be/src/agent/cgroup_cpu_ctl.cpp

Large diffs are not rendered by default.

87 changes: 76 additions & 11 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ namespace doris {

// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static std::string CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE = "max 100000";

class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }

virtual Status init();
virtual Status init() = 0;

virtual Status add_thread_to_cgroup() = 0;

Expand All @@ -48,18 +48,36 @@ class CgroupCpuCtl {
// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);

virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) = 0;
static void init_doris_cgroup_path();

static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);

static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);

static bool is_a_valid_cgroup_path(std::string cg_path);

static uint64_t cpu_soft_limit_default_value();

protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
Status write_cg_sys_file(std::string file_path, std::string value, std::string msg,
bool is_append);

virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;

virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;

std::string _doris_cgroup_cpu_path;
uint64_t _cpu_core_num = CpuInfo::num_cores();
uint64_t _cpu_cfs_period_us = 100000;
Status add_thread_to_cgroup(std::string task_file);

protected:
Comment thread
yiguolei marked this conversation as resolved.
inline static uint64_t _cpu_core_num;
const static uint64_t _cpu_cfs_period_us = 100000;
inline static std::string _doris_cgroup_cpu_path = "";
inline static std::string _doris_cgroup_cpu_query_path = "";
inline static bool _is_enable_cgroup_v1_in_env = false;
inline static bool _is_enable_cgroup_v2_in_env = false;
inline static bool _is_cgroup_query_path_valid = false;

protected:
Comment thread
yiguolei marked this conversation as resolved.
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
Expand Down Expand Up @@ -96,20 +114,67 @@ class CgroupCpuCtl {
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
CgroupV1CpuCtl() = default;
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;

Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;

private:
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
std::string _cgroup_v1_cpu_tg_quota_file;
std::string _cgroup_v1_cpu_tg_shares_file;
std::string _cgroup_v1_cpu_tg_task_file;
};

/*
NOTE: cgroup v2 directory structure
1 root path:
/sys/fs/cgroup

2 doris home path:
/sys/fs/cgroup/{doris_home}/

3 doris home subtree_control file:
/sys/fs/cgroup/{doris_home}/cgroup.subtree_control

4 query path:
/sys/fs/cgroup/{doris_home}/query/

5 query path subtree_control file:
/sys/fs/cgroup/{doris_home}/query/cgroup.subtree_control

6 workload group path:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}

7 workload grou cpu.max file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.max

8 workload grou cpu.weight file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.weight

9 workload group cgroup type file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cgroup.type

*/
class CgroupV2CpuCtl : public CgroupCpuCtl {
public:
CgroupV2CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;

private:
Status enable_cpu_controller(std::string file);

private:
Comment thread
yiguolei marked this conversation as resolved.
std::string _doris_cgroup_cpu_path_subtree_ctl_file;
std::string _cgroup_v2_query_path_subtree_ctl_file;
std::string _cgroup_v2_query_wg_path;
std::string _cgroup_v2_query_wg_cpu_max_file;
std::string _cgroup_v2_query_wg_cpu_weight_file;
std::string _cgroup_v2_query_wg_thread_file;
std::string _cgroup_v2_query_wg_type_file;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
// NOTE: runtime query statistics mgr could be visited by query and daemon thread
// so it should be created before all query begin and deleted after all query and daemon thread stoppped
_runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr();
CgroupCpuCtl::init_doris_cgroup_path();
_file_cache_factory = new io::FileCacheFactory();
std::vector<doris::CachePath> cache_paths;
init_file_cache_factory(cache_paths);
Expand Down
25 changes: 14 additions & 11 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@

namespace doris {

const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024;
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;

Expand Down Expand Up @@ -310,7 +308,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
}

// 4 cpu_share
uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
uint64_t cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value();
if (tworkload_group_info.__isset.cpu_share) {
cpu_share = tworkload_group_info.cpu_share;
}
Expand Down Expand Up @@ -415,14 +413,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e

std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(tg_id);
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
if (cgroup_cpu_ctl) {
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
} else {
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id
<< ", reason=" << ret.to_string();
}
} else {
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id
<< ", reason=" << ret.to_string();
LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed";
}
}

Expand Down Expand Up @@ -521,7 +523,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
_cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
_cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
_cgroup_cpu_ctl->update_cpu_soft_limit(
CgroupCpuCtl::cpu_soft_limit_default_value());
} else {
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: "
<< cpu_hard_limit << ", gid=" << tg_id;
Expand Down
27 changes: 7 additions & 20 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
}
// wg is shutdown and running rum = 0, its resource can be released in BE
if (workload_group_ptr->can_be_dropped()) {
LOG(INFO) << "[topic_publish_wg]There is no query in wg" << wg_id << ", delete it.";
LOG(INFO) << "[topic_publish_wg]There is no query in wg " << wg_id
<< ", delete it.";
deleted_task_groups.push_back(workload_group_ptr);
}
}
Expand Down Expand Up @@ -121,30 +122,16 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
// Using cgdelete has no such issue.
{
if (config::doris_cgroup_cpu_path != "") {
std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
if (!_cg_cpu_ctl) {
_cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
}
if (!_is_init_succ) {
Status ret = _cg_cpu_ctl->init();
if (ret.ok()) {
_is_init_succ = true;
} else {
LOG(INFO) << "[topic_publish_wg]init workload group mgr cpu ctl failed, "
<< ret.to_string();
}
}
if (_is_init_succ) {
Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
if (!ret.ok()) {
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
}
std::lock_guard<std::shared_mutex> write_lock(_clear_cgroup_lock);
Status ret = CgroupCpuCtl::delete_unused_cgroup_path(used_wg_id);
if (!ret.ok()) {
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
}
}
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: " << time_cost_ms
<< "ms, deleted group size:" << deleted_task_groups.size()
<< " ms, deleted group size:" << deleted_task_groups.size()
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
}

Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class WorkloadGroupMgr {
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;

std::shared_mutex _init_cg_ctl_lock;
std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
bool _is_init_succ = false;
std::shared_mutex _clear_cgroup_lock;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
row.add(val + "%");
}
} else if (CPU_SHARE.equals(key) && !properties.containsKey(key)) {
row.add("1024");
row.add("-1");
} else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) {
row.add("0%");
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) {
Expand Down
8 changes: 4 additions & 4 deletions regression-test/data/workload_manager_p0/test_curd_wlg.out
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 100 0 0 20% -1

-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 10% 10%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 10% 10%

-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 -1 10%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 -1 10%

-- !show_spill_2 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 10%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 10%

-- !show_spill_3 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 40%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 40%

-- !show_wg_tag --
tag1_mem_wg1 50% -1 mem_tag1
Expand Down