Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 65 additions & 53 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,20 @@ class Lz4BlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(Context);

public:
Context() : ctx(nullptr) {}
Context() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
LZ4_stream_t* ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~Context() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4_freeStream(ctx);
}
buffer.reset();
}
};

Expand All @@ -118,8 +125,6 @@ class Lz4BlockCompression : public BlockCompressionCodec {
}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (input.size > INT_MAX) {
return Status::InvalidArgument(
"LZ4 not support those case(input.size>INT_MAX), maybe you should change "
Expand All @@ -144,8 +149,14 @@ class Lz4BlockCompression : public BlockCompressionCodec {
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
// context->buffer is resuable between queries, should accouting to
// global tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand All @@ -165,8 +176,6 @@ class Lz4BlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
Expand Down Expand Up @@ -218,8 +227,6 @@ class HadoopLz4BlockCompression : public Lz4BlockCompression {
return &s_instance;
}
Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor));
size_t input_bytes_read = 0;
size_t decompressed_len = 0;
Expand All @@ -245,13 +252,20 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(CContext);

public:
CContext() : ctx(nullptr) {}
CContext() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
LZ4F_compressionContext_t ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~CContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4F_freeCompressionContext(ctx);
}
buffer.reset();
}
};
class DContext {
Expand Down Expand Up @@ -301,8 +315,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
private:
Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Expand All @@ -319,9 +331,13 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand Down Expand Up @@ -361,8 +377,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
}

Status _decompress(const Slice& input, Slice* output) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
bool decompress_failed = false;
std::unique_ptr<DContext> context;
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Expand Down Expand Up @@ -472,13 +486,20 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(Context);

public:
Context() : ctx(nullptr) {}
Context() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
LZ4_streamHC_t* ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~Context() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4_freeStreamHC(ctx);
}
buffer.reset();
}
};

Expand All @@ -494,8 +515,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<Context> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Expand All @@ -512,9 +531,13 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand All @@ -533,8 +556,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
Expand Down Expand Up @@ -654,8 +675,6 @@ class SnappyBlockCompression : public BlockCompressionCodec {
~SnappyBlockCompression() override {}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(input.size);
output->resize(max_len);
Slice s(*output);
Expand All @@ -666,8 +685,6 @@ class SnappyBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (!snappy::RawUncompress(input.data, input.size, output->data)) {
return Status::InvalidArgument("Fail to do Snappy decompress");
}
Expand Down Expand Up @@ -699,8 +716,6 @@ class ZlibBlockCompression : public BlockCompressionCodec {
~ZlibBlockCompression() {}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(input.size);
output->resize(max_len);
Slice s(*output);
Expand All @@ -715,8 +730,6 @@ class ZlibBlockCompression : public BlockCompressionCodec {

Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(uncompressed_size);
output->resize(max_len);

Expand Down Expand Up @@ -757,8 +770,6 @@ class ZlibBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t input_size = input.size;
auto zres =
::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
Expand All @@ -781,13 +792,20 @@ class ZstdBlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(CContext);

public:
CContext() : ctx(nullptr) {}
CContext() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
ZSTD_CCtx* ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~CContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
ZSTD_freeCCtx(ctx);
}
buffer.reset();
}
};
class DContext {
Expand Down Expand Up @@ -826,8 +844,6 @@ class ZstdBlockCompression : public BlockCompressionCodec {
// https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Expand All @@ -845,9 +861,13 @@ class ZstdBlockCompression : public BlockCompressionCodec {
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
Comment thread
yiguolei marked this conversation as resolved.
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand Down Expand Up @@ -904,8 +924,6 @@ class ZstdBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<DContext> context;
bool decompress_failed = false;
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Expand Down Expand Up @@ -1001,8 +1019,6 @@ class GzipBlockCompression : public ZlibBlockCompression {
~GzipBlockCompression() override = default;

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
z_stream z_strm = {};
z_strm.zalloc = Z_NULL;
z_strm.zfree = Z_NULL;
Expand Down Expand Up @@ -1084,8 +1100,6 @@ class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
~GzipBlockCompressionByLibdeflate() override = default;

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (input.empty()) {
output->size = 0;
return Status::OK();
Expand Down Expand Up @@ -1118,8 +1132,6 @@ class LzoBlockCompression final : public BlockCompressionCodec {
}
size_t max_compressed_len(size_t len) override { return 0; };
Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
auto* input_ptr = input.data;
auto remain_input_size = input.size;
auto* output_ptr = output->data;
Expand Down