Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ if (NOT WIN32)
endif ()
endif ()

option(BUILD_TEST "Build tests" OFF)
option(BUILD_TEST "Build tests" ON)
message("cmake using: BUILD_TEST=${BUILD_TEST}")

option(ENABLE_ANTLR4 "Enable ANTLR4 runtime" ON)
Expand All @@ -117,15 +117,32 @@ message("cmake using: ENABLE_ANTLR4=${ENABLE_ANTLR4}")
option(ENABLE_SNAPPY "Enable Google Snappy compression" ON)
message("cmake using: ENABLE_SNAPPY=${ENABLE_SNAPPY}")

if (ENABLE_SNAPPY)
add_definitions(-DENABLE_SNAPPY)
endif()

option(ENABLE_LZ4 "Enable LZ4 compression" ON)
message("cmake using: ENABLE_LZ4=${ENABLE_LZ4}")

if (ENABLE_LZ4)
add_definitions(-DENABLE_LZ4)
endif()

option(ENABLE_LZOKAY "Enable LZOKAY compression" ON)
message("cmake using: ENABLE_LZOKAY=${ENABLE_LZOKAY}")

if (ENABLE_LZOKAY)
add_definitions(-DENABLE_LZOKAY)
endif()

option(ENABLE_ZLIB "Enable Zlib compression" ON)
message("cmake using: ENABLE_ZLIB=${ENABLE_ZLIB}")

if (ENABLE_ZLIB)
add_definitions(-DENABLE_ZLIB)
add_definitions(-DENABLE_GZIP)
endif()

# All libs will be stored here, including libtsfile, compress-encoding lib.
set(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/lib)

Expand Down
113 changes: 63 additions & 50 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
using namespace common;
namespace storage {

int AlignedChunkReader::init(ReadFile *read_file, String m_name,
TSDataType data_type, Filter *time_filter) {
int AlignedChunkReader::init(ReadFile* read_file, String m_name,
TSDataType data_type, Filter* time_filter) {
read_file_ = read_file;
measurement_name_.shallow_copy_from(m_name);
time_decoder_ = DecoderFactory::alloc_time_decoder();
Expand All @@ -50,7 +50,7 @@ void AlignedChunkReader::reset() {
cur_time_page_header_.reset();
cur_value_page_header_.reset();

char *file_data_buf = time_in_stream_.get_wrapped_buf();
char* file_data_buf = time_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
}
Expand Down Expand Up @@ -87,7 +87,7 @@ void AlignedChunkReader::destroy() {
CompressorFactory::free(value_compressor_);
value_compressor_ = nullptr;
}
char *buf = time_in_stream_.get_wrapped_buf();
char* buf = time_in_stream_.get_wrapped_buf();
if (buf != nullptr) {
mem_free(buf);
time_in_stream_.clear_wrapped_buf();
Expand All @@ -102,8 +102,8 @@ void AlignedChunkReader::destroy() {
chunk_header_.~ChunkHeader();
}

int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
ChunkMeta *value_chunk_meta) {
int AlignedChunkReader::load_by_aligned_meta(ChunkMeta* time_chunk_meta,
ChunkMeta* value_chunk_meta) {
int ret = E_OK;
time_chunk_meta_ = time_chunk_meta;
value_chunk_meta_ = value_chunk_meta;
Expand All @@ -116,8 +116,8 @@ int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
file_data_time_buf_size_ = 1024;
file_data_value_buf_size_ = 1024;
int32_t ret_read_len = 0;
char *time_file_data_buf =
(char *)mem_alloc(file_data_time_buf_size_, MOD_CHUNK_READER);
char* time_file_data_buf =
(char*)mem_alloc(file_data_time_buf_size_, MOD_CHUNK_READER);
if (IS_NULL(time_file_data_buf)) {
return E_OOM;
}
Expand All @@ -140,8 +140,8 @@ int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
}
/* ================ deserialize value_chunk_header ================*/
ret_read_len = 0;
char *value_file_data_buf =
(char *)mem_alloc(file_data_value_buf_size_, MOD_CHUNK_READER);
char* value_file_data_buf =
(char*)mem_alloc(file_data_value_buf_size_, MOD_CHUNK_READER);
if (IS_NULL(value_file_data_buf)) {
return E_OOM;
}
Expand Down Expand Up @@ -182,7 +182,7 @@ int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
}

int AlignedChunkReader::alloc_compressor_and_decoder(
storage::Decoder *&decoder, storage::Compressor *&compressor,
storage::Decoder*& decoder, storage::Compressor*& compressor,
TSEncoding encoding, TSDataType data_type, CompressionType compression) {
if (decoder != nullptr) {
decoder->reset();
Expand All @@ -204,10 +204,10 @@ int AlignedChunkReader::alloc_compressor_and_decoder(
return E_OK;
}

int AlignedChunkReader::get_next_page(TsBlock *ret_tsblock,
Filter *oneshoot_filter, PageArena &pa) {
int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
Filter* oneshoot_filter, PageArena& pa) {
int ret = E_OK;
Filter *filter =
Filter* filter =
(oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
if (prev_time_page_not_finish() && prev_value_page_not_finish()) {
ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter,
Expand Down Expand Up @@ -243,11 +243,11 @@ int AlignedChunkReader::get_next_page(TsBlock *ret_tsblock,
return ret;
}

int AlignedChunkReader::get_cur_page_header(ChunkMeta *&chunk_meta,
common::ByteStream &in_stream,
PageHeader &cur_page_header,
uint32_t &chunk_visit_offset,
ChunkHeader &chunk_header) {
int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta,
common::ByteStream& in_stream,
PageHeader& cur_page_header,
uint32_t& chunk_visit_offset,
ChunkHeader& chunk_header) {
int ret = E_OK;
bool retry = true;
int cur_page_header_serialized_size = 0;
Expand All @@ -263,7 +263,7 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta *&chunk_meta,
if (deserialize_buf_not_enough(ret) && retry) {
retry = false;
retry_read_want_size += 1024;
int32_t &file_data_buf_size =
int32_t& file_data_buf_size =
chunk_header.data_type_ == common::VECTOR
? file_data_time_buf_size_
: file_data_value_buf_size_;
Expand Down Expand Up @@ -295,18 +295,18 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta *&chunk_meta,
// reader at least @want_size bytes from file and wrap the buffer into
// @in_stream_
int AlignedChunkReader::read_from_file_and_rewrap(
common::ByteStream &in_stream_, ChunkMeta *&chunk_meta,
uint32_t &chunk_visit_offset, int32_t &file_data_buf_size, int want_size,
common::ByteStream& in_stream_, ChunkMeta*& chunk_meta,
uint32_t& chunk_visit_offset, int32_t& file_data_buf_size, int want_size,
bool may_shrink) {
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size
char *file_data_buf = in_stream_.get_wrapped_buf();
char* file_data_buf = in_stream_.get_wrapped_buf();
int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
if (file_data_buf_size < read_size ||
(may_shrink && read_size < file_data_buf_size / 10)) {
file_data_buf = (char *)mem_realloc(file_data_buf, read_size);
file_data_buf = (char*)mem_realloc(file_data_buf, read_size);
if (IS_NULL(file_data_buf)) {
return E_OOM;
}
Expand All @@ -326,7 +326,7 @@ int AlignedChunkReader::read_from_file_and_rewrap(
return ret;
}

bool AlignedChunkReader::cur_page_statisify_filter(Filter *filter) {
bool AlignedChunkReader::cur_page_statisify_filter(Filter* filter) {
bool value_satisfy = filter == nullptr ||
cur_value_page_header_.statistic_ == nullptr ||
filter->satisfy(cur_value_page_header_.statistic_);
Expand Down Expand Up @@ -364,8 +364,8 @@ int AlignedChunkReader::decode_cur_time_page_data() {
}
}

char *time_compressed_buf = nullptr;
char *time_uncompressed_buf = nullptr;
char* time_compressed_buf = nullptr;
char* time_uncompressed_buf = nullptr;
uint32_t time_compressed_buf_size = 0;
uint32_t time_uncompressed_buf_size = 0;

Expand All @@ -376,7 +376,7 @@ int AlignedChunkReader::decode_cur_time_page_data() {
#ifdef DEBUG_SE
std::cout << "AlignedChunkReader::decode_cur_page_data,time_in_stream_."
"get_wrapped_buf="
<< (void *)(time_in_stream_.get_wrapped_buf())
<< (void*)(time_in_stream_.get_wrapped_buf())
<< ", time_in_stream_.read_pos=" << time_in_stream_.read_pos()
<< std::endl;
#endif
Expand Down Expand Up @@ -427,11 +427,11 @@ int AlignedChunkReader::decode_cur_value_page_data() {
}
}

char *value_compressed_buf = nullptr;
char *value_uncompressed_buf = nullptr;
char* value_compressed_buf = nullptr;
char* value_uncompressed_buf = nullptr;
uint32_t value_compressed_buf_size = 0;
uint32_t value_uncompressed_buf_size = 0;
char *value_buf = nullptr;
char* value_buf = nullptr;
uint32_t value_buf_size = 0;

// Step 2: do uncompress
Expand Down Expand Up @@ -467,7 +467,7 @@ int AlignedChunkReader::decode_cur_value_page_data() {
SerializationUtil::read_ui32(value_uncompressed_buf);
value_uncompressed_buf_offset += sizeof(uint32_t);
value_page_col_notnull_bitmap_.resize((value_page_data_num_ + 7) / 8);
for (unsigned char &i : value_page_col_notnull_bitmap_) {
for (unsigned char& i : value_page_col_notnull_bitmap_) {
i = *(value_uncompressed_buf + value_uncompressed_buf_offset);
value_uncompressed_buf_offset++;
}
Expand All @@ -486,7 +486,7 @@ int AlignedChunkReader::decode_cur_value_page_data() {
}

int AlignedChunkReader::decode_time_value_buf_into_tsblock(
TsBlock *&ret_tsblock, Filter *filter, common::PageArena *pa) {
TsBlock*& ret_tsblock, Filter* filter, common::PageArena* pa) {
int ret = common::E_OK;
ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_,
ret_tsblock, filter, pa);
Expand Down Expand Up @@ -535,7 +535,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
ret = E_OVERFLOW; \
break; \
} \
row_appender.append(0, (char *)&time, sizeof(time)); \
row_appender.append(0, (char*)&time, sizeof(time)); \
row_appender.append_null(1); \
continue; \
} \
Expand All @@ -552,15 +552,15 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
} else { \
/*std::cout << "decoder: time=" << time << ", value=" << value \
* << std::endl;*/ \
row_appender.append(0, (char *)&time, sizeof(time)); \
row_appender.append(1, (char *)&value, sizeof(value)); \
row_appender.append(0, (char*)&time, sizeof(time)); \
row_appender.append(1, (char*)&value, sizeof(value)); \
} \
} \
} while (false)

int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
ByteStream &time_in, ByteStream &value_in, RowAppender &row_appender,
Filter *filter) {
ByteStream& time_in, ByteStream& value_in, RowAppender& row_appender,
Filter* filter) {
int ret = E_OK;
uint32_t mask = 1 << 7;
int64_t time = 0;
Expand All @@ -578,7 +578,7 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
ret = E_OVERFLOW;
break;
}
row_appender.append(0, (char *)&time, sizeof(time));
row_appender.append(0, (char*)&time, sizeof(time));
row_appender.append_null(1);
continue;
}
Expand All @@ -594,16 +594,16 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
} else {
/*std::cout << "decoder: time=" << time << ", value=" << value
* << std::endl;*/
row_appender.append(0, (char *)&time, sizeof(time));
row_appender.append(1, (char *)&value, sizeof(value));
row_appender.append(0, (char*)&time, sizeof(time));
row_appender.append(1, (char*)&value, sizeof(value));
}
}
return ret;
}

int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
ByteStream &time_in, ByteStream &value_in, TsBlock *ret_tsblock,
Filter *filter, common::PageArena *pa) {
ByteStream& time_in, ByteStream& value_in, TsBlock* ret_tsblock,
Filter* filter, common::PageArena* pa) {
int ret = E_OK;
RowAppender row_appender(ret_tsblock);
switch (value_chunk_header_.data_type_) {
Expand Down Expand Up @@ -648,24 +648,37 @@ int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
}

int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
ByteStream &time_in, ByteStream &value_in, RowAppender &row_appender,
PageArena &pa, Filter *filter) {
ByteStream& time_in, ByteStream& value_in, RowAppender& row_appender,
PageArena& pa, Filter* filter) {
int ret = E_OK;
int64_t time = 0;
common::String value;
while (time_decoder_->has_remaining(time_in)) {
ASSERT(value_decoder_->has_remaining(value_in));
uint32_t mask = 1 << 7;
while (time_decoder_->has_remaining(time_in) &&
value_decoder_->has_remaining(value_in)) {
cur_value_index++;
bool should_read_data = true;
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
(mask >> (cur_value_index % 8))) == 0) {
should_read_data = false;
}
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
cur_value_index--;
break;
} else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
} else if (RET_FAIL(value_decoder_->read_String(value, pa, value_in))) {
} else if (should_read_data &&
RET_FAIL(value_decoder_->read_String(value, pa, value_in))) {
} else if (filter != nullptr && !filter->satisfy(time, value)) {
row_appender.backoff_add_row();
continue;
} else {
row_appender.append(0, (char *)&time, sizeof(time));
row_appender.append(1, value.buf_, value.len_);
row_appender.append(0, (char*)&time, sizeof(time));
if (!should_read_data) {
row_appender.append_null(1);
} else {
row_appender.append(1, value.buf_, value.len_);
}
}
}
return ret;
Expand Down
Loading
Loading