Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ DEFINE_String(priority_networks, "");
// performance or compact
DEFINE_String(memory_mode, "moderate");

DEFINE_mBool(enable_use_cgroup_memory_info, "true");

// process memory limit specified as number of bytes
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
// or percentage of the physical memory ('<int>%').
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ DECLARE_String(priority_networks);
// performance moderate or compact, only tcmalloc compile
DECLARE_String(memory_mode);

// if true, process memory limit and memory usage based on cgroup memory info.
DECLARE_mBool(enable_use_cgroup_memory_info);

// process memory limit specified as number of bytes
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
// or percentage of the physical memory ('<int>%').
Expand Down
10 changes: 10 additions & 0 deletions be/src/util/cgroup_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ Status CGroupUtil::find_cgroup_mem_usage(int64_t* bytes) {
return read_cgroup_value(usage_file_path, bytes);
}

Status CGroupUtil::find_cgroup_mem_info(std::string* file_path) {
if (!enable()) {
return Status::InvalidArgument("cgroup is not enabled!");
}
string cgroup_path;
RETURN_IF_ERROR(find_abs_cgroup_path("memory", &cgroup_path));
*file_path = cgroup_path + "/memory.meminfo";
return Status::OK();
}

Status CGroupUtil::find_cgroup_cpu_limit(float* cpu_count) {
if (!enable()) {
return Status::InvalidArgument("cgroup is not enabled!");
Expand Down
1 change: 1 addition & 0 deletions be/src/util/cgroup_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class CGroupUtil {
// memory.usage_in_bytes ~= free.used + free.(buff/cache) - (buff)
// https://serverfault.com/questions/902009/the-memory-usage-reported-in-cgroup-differs-from-the-free-command
static Status find_cgroup_mem_usage(int64_t* bytes);
static Status find_cgroup_mem_info(std::string* file_path);

// Determines the CGroup cpu cores limit from the current processes' cgroup.
static Status find_cgroup_cpu_limit(float* cpu_count);
Expand Down
108 changes: 87 additions & 21 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ bvar::PassiveStatus<int64_t> g_sys_mem_avail(

bool MemInfo::_s_initialized = false;
std::atomic<int64_t> MemInfo::_s_physical_mem = std::numeric_limits<int64_t>::max();
int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
int64_t MemInfo::_s_cgroup_mem_limit_refresh_wait_times = 0;
std::atomic<int64_t> MemInfo::_s_mem_limit = std::numeric_limits<int64_t>::max();
std::atomic<int64_t> MemInfo::_s_soft_mem_limit = std::numeric_limits<int64_t>::max();

Expand All @@ -69,6 +67,12 @@ std::string MemInfo::_s_allocator_cache_mem_str = "";
std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;

int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
int64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
static std::unordered_map<std::string, int64_t> _s_cgroup_mem_info_bytes;
bool MemInfo::_s_cgroup_mem_refresh_state = false;
int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0;

static std::unordered_map<std::string, int64_t> _mem_info_bytes;
std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1;
int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
Expand Down Expand Up @@ -393,27 +397,91 @@ void MemInfo::refresh_proc_meminfo() {
meminfo.close();
}

// 1. calculate physical_mem
int64_t physical_mem = -1;
int64_t cgroup_mem_limit = -1;
physical_mem = _mem_info_bytes["MemTotal"];
if (_s_cgroup_mem_limit_refresh_wait_times >= 0) {
// refresh cgroup memory
if (_s_cgroup_mem_refresh_wait_times >= 0 && config::enable_use_cgroup_memory_info) {
int64_t cgroup_mem_limit = -1;
int64_t cgroup_mem_usage = -1;
std::string cgroup_mem_info_file_path;
_s_cgroup_mem_refresh_state = true;
Status status = CGroupUtil::find_cgroup_mem_limit(&cgroup_mem_limit);
if (status.ok() && cgroup_mem_limit > 0) {
if (!status.ok() || cgroup_mem_limit <= 0) {
_s_cgroup_mem_refresh_state = false;
}
status = CGroupUtil::find_cgroup_mem_usage(&cgroup_mem_usage);
if (!status.ok() || cgroup_mem_usage <= 0) {
_s_cgroup_mem_refresh_state = false;
}
status = CGroupUtil::find_cgroup_mem_info(&cgroup_mem_info_file_path);
if (status.ok()) {
std::ifstream cgroup_meminfo(cgroup_mem_info_file_path, std::ios::in);
std::string line;

while (cgroup_meminfo.good() && !cgroup_meminfo.eof()) {
getline(cgroup_meminfo, line);
std::vector<std::string> fields =
strings::Split(line, " ", strings::SkipWhitespace());
if (fields.size() < 2) {
continue;
}
std::string key = fields[0].substr(0, fields[0].size() - 1);

StringParser::ParseResult result;
auto mem_value = StringParser::string_to_int<int64_t>(fields[1].data(),
fields[1].size(), &result);

if (result == StringParser::PARSE_SUCCESS) {
if (fields.size() == 2) {
_s_cgroup_mem_info_bytes[key] = mem_value;
} else if (fields[2] == "kB") {
_s_cgroup_mem_info_bytes[key] = mem_value * 1024L;
}
}
}
if (cgroup_meminfo.is_open()) {
cgroup_meminfo.close();
}
} else {
_s_cgroup_mem_refresh_state = false;
}

if (_s_cgroup_mem_refresh_state) {
_s_cgroup_mem_limit = cgroup_mem_limit;
_s_cgroup_mem_limit_refresh_wait_times =
-1000; // wait 10s, 1000 * 100ms, avoid too frequently.
// https://serverfault.com/questions/902009/the-memory-usage-reported-in-cgroup-differs-from-the-free-command
// memory.usage_in_bytes ~= free.used + free.(buff/cache) - (buff)
// so, memory.usage_in_bytes - memory.meminfo["Cached"]
_s_cgroup_mem_usage = cgroup_mem_usage - _s_cgroup_mem_info_bytes["Cached"];
// wait 10s, 100 * 100ms, avoid too frequently.
_s_cgroup_mem_refresh_wait_times = -100;
LOG(INFO) << "Refresh cgroup memory win, refresh again after 10s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["Cached"];
} else {
_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
_s_cgroup_mem_limit_refresh_wait_times =
-6000; // find cgroup failed, wait 60s, 6000 * 100ms.
// find cgroup failed, wait 300s, 1000 * 100ms.
_s_cgroup_mem_refresh_wait_times = -3000;
LOG(INFO)
<< "Refresh cgroup memory failed, refresh again after 300s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["Cached"];
}
} else {
_s_cgroup_mem_limit_refresh_wait_times++;
if (config::enable_use_cgroup_memory_info) {
_s_cgroup_mem_refresh_wait_times++;
} else {
_s_cgroup_mem_refresh_state = false;
}
}
if (_s_cgroup_mem_limit > 0) {

// 1. calculate physical_mem
int64_t physical_mem = -1;

physical_mem = _mem_info_bytes["MemTotal"];
if (_s_cgroup_mem_refresh_state) {
// In theory, always cgroup_mem_limit < physical_mem
physical_mem = std::min(physical_mem, _s_cgroup_mem_limit);
if (physical_mem < 0) {
physical_mem = _s_cgroup_mem_limit;
} else {
physical_mem = std::min(physical_mem, _s_cgroup_mem_limit);
}
}

if (physical_mem <= 0) {
Expand Down Expand Up @@ -449,16 +517,14 @@ void MemInfo::refresh_proc_meminfo() {

// 3. refresh process available memory
int64_t mem_available = -1;
int64_t cgroup_mem_usage = 0;
if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
mem_available = _mem_info_bytes["MemAvailable"];
}
auto status = CGroupUtil::find_cgroup_mem_usage(&cgroup_mem_usage);
if (status.ok() && cgroup_mem_usage > 0 && cgroup_mem_limit > 0) {
if (_s_cgroup_mem_refresh_state) {
if (mem_available < 0) {
mem_available = cgroup_mem_limit - cgroup_mem_usage;
mem_available = _s_cgroup_mem_limit - _s_cgroup_mem_usage;
} else {
mem_available = std::min(mem_available, cgroup_mem_limit - cgroup_mem_usage);
mem_available = std::min(mem_available, _s_cgroup_mem_limit - _s_cgroup_mem_usage);
}
}
if (mem_available < 0) {
Expand Down
7 changes: 5 additions & 2 deletions be/src/util/mem_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,18 @@ class MemInfo {
private:
static bool _s_initialized;
static std::atomic<int64_t> _s_physical_mem;
static int64_t _s_cgroup_mem_limit;
static int64_t _s_cgroup_mem_limit_refresh_wait_times;
static std::atomic<int64_t> _s_mem_limit;
static std::atomic<int64_t> _s_soft_mem_limit;

static std::atomic<int64_t> _s_allocator_cache_mem;
static std::string _s_allocator_cache_mem_str;
static std::atomic<int64_t> _s_virtual_memory_used;

static int64_t _s_cgroup_mem_limit;
static int64_t _s_cgroup_mem_usage;
static bool _s_cgroup_mem_refresh_state;
static int64_t _s_cgroup_mem_refresh_wait_times;

static std::atomic<int64_t> _s_sys_mem_available;
static int64_t _s_sys_mem_available_low_water_mark;
static int64_t _s_sys_mem_available_warning_water_mark;
Expand Down