Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cpp/src/common/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ int Tablet::add_value(uint32_t row_index, const std::string& measurement_name,
return add_value(row_index, measurement_name, String(val));
}

template <>
int Tablet::add_value(uint32_t row_index, const std::string& measurement_name,
std::string val) {
return add_value(row_index, measurement_name, String(val));
}

template int Tablet::add_value(uint32_t row_index, uint32_t schema_index,
bool val);
template int Tablet::add_value(uint32_t row_index, uint32_t schema_index,
Expand Down
55 changes: 55 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,61 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns,
return table_result_set;
}

ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader,
char** device_ids, int device_ids_len,
char** measurement_names,
int measurement_names_len, int offset,
int limit, ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
storage::ResultSet* result_set = nullptr;

std::vector<std::string> path_list;
if (device_ids_len > 0 && measurement_names_len > 0) {
path_list.reserve(static_cast<size_t>(device_ids_len) *
static_cast<size_t>(measurement_names_len));
}

for (int i = 0; i < device_ids_len; i++) {
const char* device_id = device_ids[i];
if (device_id == nullptr) {
continue;
}
for (int j = 0; j < measurement_names_len; j++) {
const char* measurement_name = measurement_names[j];
if (measurement_name == nullptr) {
continue;
}
path_list.emplace_back(std::string(device_id) + "." +
std::string(measurement_name));
}
}

*err_code = r->queryByRow(path_list, offset, limit, result_set);
return result_set;
}

ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
const char* table_name,
char** column_names,
int column_names_len, int offset,
int limit, ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
storage::ResultSet* result_set = nullptr;

std::vector<std::string> columns;
if (column_names_len > 0) {
columns.reserve(static_cast<size_t>(column_names_len));
}
for (int i = 0; i < column_names_len; i++) {
const char* name = column_names[i];
columns.emplace_back(name == nullptr ? "" : std::string(name));
}

*err_code = r->queryByRow(table_name == nullptr ? "" : table_name, columns,
offset, limit, result_set);
return result_set;
}

bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) {
auto* r = static_cast<storage::ResultSet*>(result_set);
bool has_next = true;
Expand Down
44 changes: 44 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,50 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name,
ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns,
uint32_t column_num, Timestamp start_time,
Timestamp end_time, ERRNO* err_code);

/**
* @brief Query time series (tree model) by row with offset/limit.
*
* For tree model, each (device_id, measurement_name) pair maps to a full path
* "device_id.measurement_name". The result set merges multiple paths by
* timestamp, applies the global offset/limit at merge layer, and returns
* at most @p limit rows. < 0 limit means unlimited.
*
* @param reader [in] Valid TsFileReader handle obtained from
* tsfile_reader_new().
* @param device_ids [in] Array of device identifiers.
* @param device_ids_len [in] Device id count.
* @param measurement_names [in] Array of measurement (sensor) names.
* @param measurement_names_len [in] Measurement name count.
* @param offset [in] Number of leading rows to skip (>= 0).
* @param limit [in] Maximum rows to return. < 0 means unlimited.
* @param err_code [out] Error code. E_OK(0) on success.
* @return ResultSet handle on success; NULL on failure.
*/
ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader,
char** device_ids, int device_ids_len,
char** measurement_names,
int measurement_names_len, int offset,
int limit, ERRNO* err_code);

/**
* @brief Query table-model data by row with offset/limit pushdown.
*
* @param reader [in] Valid TsFileReader handle obtained from
* tsfile_reader_new().
* @param table_name [in] Target table name.
* @param column_names [in] Array of requested column names.
* @param column_names_len [in] Requested column count.
* @param offset [in] Number of leading rows to skip (>= 0).
* @param limit [in] Maximum rows to return. < 0 means unlimited.
* @param err_code [out] Error code. E_OK(0) on success.
* @return ResultSet handle on success; NULL on failure.
*/
ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
const char* table_name,
char** column_names,
int column_names_len, int offset,
int limit, ERRNO* err_code);
// ResultSet tsfile_reader_query_device(TsFileReader reader,
// const char* device_name,
// char** sensor_name, uint32_t sensor_num,
Expand Down
15 changes: 9 additions & 6 deletions cpp/src/file/tsfile_io_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,15 @@ int TsFileIOReader::load_device_index_entry(
if (device_id_comparable == nullptr) {
return E_INVALID_DATA_POINT;
}
auto index_node = tsfile_meta_.table_metadata_index_node_map_
[device_id_comparable->device_id_->get_table_name()];
assert(tsfile_meta_.table_metadata_index_node_map_.find(
device_id_comparable->device_id_->get_table_name()) !=
tsfile_meta_.table_metadata_index_node_map_.end());
assert(index_node != nullptr);
std::string table_name = device_id_comparable->device_id_->get_table_name();
auto it = tsfile_meta_.table_metadata_index_node_map_.find(table_name);
if (it == tsfile_meta_.table_metadata_index_node_map_.end()) {
return E_DEVICE_NOT_EXIST;
}
auto index_node = it->second;
if (index_node == nullptr) {
return E_DEVICE_NOT_EXIST;
}
if (index_node->node_type_ == LEAF_DEVICE) {
// FIXME
ret = index_node->binary_search_children(
Expand Down
98 changes: 98 additions & 0 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ void AlignedChunkReader::reset() {
}

void AlignedChunkReader::destroy() {
if (time_uncompressed_buf_ != nullptr && time_compressor_ != nullptr) {
time_compressor_->after_uncompress(time_uncompressed_buf_);
time_uncompressed_buf_ = nullptr;
}
if (value_uncompressed_buf_ != nullptr && value_compressor_ != nullptr) {
value_compressor_->after_uncompress(value_uncompressed_buf_);
value_uncompressed_buf_ = nullptr;
}
value_page_col_notnull_bitmap_.clear();
value_page_col_notnull_bitmap_.shrink_to_fit();
if (time_decoder_ != nullptr) {
time_decoder_->~Decoder();
DecoderFactory::free(time_decoder_);
Expand Down Expand Up @@ -711,4 +721,92 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
return ret;
}

bool AlignedChunkReader::should_skip_page_by_time(int64_t min_time_hint) {
if (min_time_hint < 0) {
return false;
}
// Use time page statistic for time-based skipping.
if (cur_time_page_header_.statistic_ != nullptr) {
return cur_time_page_header_.statistic_->end_time_ < min_time_hint;
}
if (cur_value_page_header_.statistic_ != nullptr) {
return cur_value_page_header_.statistic_->end_time_ < min_time_hint;
}
return false;
}

bool AlignedChunkReader::should_skip_page_by_offset(int& row_offset) {
if (row_offset <= 0) {
return false;
}
// Use time page statistic for count.
Statistic* stat = cur_time_page_header_.statistic_;
if (stat == nullptr) {
stat = cur_value_page_header_.statistic_;
}
if (stat == nullptr || stat->count_ == 0) {
return false;
}
int32_t count = stat->count_;
if (row_offset >= count) {
row_offset -= count;
return true;
}
return false;
}

int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
Filter* oneshoot_filter, PageArena& pa,
int64_t min_time_hint, int& row_offset,
int& row_limit) {
int ret = E_OK;
Filter* filter =
(oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);

if (row_limit == 0) {
return E_NO_MORE_DATA;
}

if (prev_time_page_not_finish() && prev_value_page_not_finish()) {
ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter,
&pa);
return ret;
}
if (!prev_time_page_not_finish() && !prev_value_page_not_finish()) {
while (IS_SUCC(ret)) {
if (RET_FAIL(get_cur_page_header(
time_chunk_meta_, time_in_stream_, cur_time_page_header_,
time_chunk_visit_offset_, time_chunk_header_))) {
} else if (RET_FAIL(get_cur_page_header(
value_chunk_meta_, value_in_stream_,
cur_value_page_header_, value_chunk_visit_offset_,
value_chunk_header_))) {
} else if (!cur_page_statisify_filter(filter)) {
if (RET_FAIL(skip_cur_page())) {
}
} else if (should_skip_page_by_time(min_time_hint)) {
if (RET_FAIL(skip_cur_page())) {
}
} else if (should_skip_page_by_offset(row_offset)) {
if (RET_FAIL(skip_cur_page())) {
}
} else {
break;
}
if (!has_more_data()) {
ret = E_NO_MORE_DATA;
break;
}
}
if (IS_SUCC(ret)) {
ret = decode_cur_time_page_data() || decode_cur_value_page_data();
}
}
if (IS_SUCC(ret)) {
ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter,
&pa);
}
return ret;
}

} // end namespace storage
6 changes: 6 additions & 0 deletions cpp/src/reader/aligned_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ class AlignedChunkReader : public IChunkReader {
int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
common::PageArena& pa) override;

int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
common::PageArena& pa, int64_t min_time_hint,
int& row_offset, int& row_limit) override;

private:
bool should_skip_page_by_time(int64_t min_time_hint);
bool should_skip_page_by_offset(int& row_offset);
FORCE_INLINE bool chunk_has_only_one_page(
const ChunkHeader& chunk_header) const {
return (chunk_header.chunk_type_ & ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) ==
Expand Down
32 changes: 27 additions & 5 deletions cpp/src/reader/block/device_ordered_tsblock_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,34 @@ namespace storage {

int DeviceOrderedTsBlockReader::has_next(bool& has_next) {
int ret = common::E_OK;

if (remaining_limit_ == 0) {
has_next = false;
return common::E_OK;
}

if (current_reader_ != nullptr &&
IS_SUCC(current_reader_->has_next(has_next)) && has_next) {
return common::E_OK;
}
if (current_reader_ != nullptr) {
remaining_offset_ = current_reader_->get_remaining_offset();
remaining_limit_ = current_reader_->get_remaining_limit();
delete current_reader_;
current_reader_ = nullptr;
}
while (device_task_iterator_->has_next()) {
if (remaining_limit_ == 0) {
has_next = false;
return common::E_OK;
}
while (true) {
if (remaining_limit_ == 0) {
has_next = false;
return common::E_OK;
}
if (!device_task_iterator_->has_next()) {
break;
}
DeviceQueryTask* task = nullptr;
if (IS_FAIL(device_task_iterator_->next(task))) {
return ret;
Expand All @@ -47,7 +66,8 @@ int DeviceOrderedTsBlockReader::has_next(bool& has_next) {
return common::E_OOM;
}
if (RET_FAIL(current_reader_->init(task, block_size_, time_filter_,
field_filter_))) {
field_filter_, remaining_offset_,
remaining_limit_))) {
delete current_reader_;
current_reader_ = nullptr;
return ret;
Expand All @@ -56,13 +76,12 @@ int DeviceOrderedTsBlockReader::has_next(bool& has_next) {
if (RET_FAIL(current_reader_->has_next(has_next))) {
return ret;
}
// If current device has data, just return.
if (has_next) {
return ret;
}
// If current device does not have data, get next device.

// Free current device reader.
remaining_offset_ = current_reader_->get_remaining_offset();
remaining_limit_ = current_reader_->get_remaining_limit();
if (current_reader_) {
delete current_reader_;
current_reader_ = nullptr;
Expand All @@ -86,6 +105,9 @@ void DeviceOrderedTsBlockReader::close() {
delete current_reader_;
current_reader_ = nullptr;
}
if (device_task_iterator_) {
device_task_iterator_->flush_remaining_device_meta_cache();
}
if (time_filter_ != nullptr) {
delete time_filter_;
time_filter_ = nullptr;
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/reader/block/device_ordered_tsblock_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ class DeviceOrderedTsBlockReader : public TsBlockReader {
std::unique_ptr<DeviceTaskIterator> device_task_iterator,
IMetadataQuerier* metadata_querier, int32_t block_size,
TsFileIOReader* tsfile_io_reader, Filter* time_filter,
Filter* field_filter)
Filter* field_filter, int row_offset = 0, int row_limit = -1)
: device_task_iterator_(std::move(device_task_iterator)),
metadata_querier_(metadata_querier),
block_size_(block_size),
tsfile_io_reader_(tsfile_io_reader),
time_filter_(time_filter),
field_filter_(field_filter) {}
field_filter_(field_filter),
remaining_offset_(row_offset),
remaining_limit_(row_limit) {}
~DeviceOrderedTsBlockReader() override { close(); }

int has_next(bool& has_next) override;
Expand All @@ -54,6 +56,8 @@ class DeviceOrderedTsBlockReader : public TsBlockReader {
TsFileIOReader* tsfile_io_reader_;
Filter* time_filter_ = nullptr;
Filter* field_filter_ = nullptr;
int remaining_offset_ = 0;
int remaining_limit_ = -1;
};
} // namespace storage

Expand Down
Loading
Loading