Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "olap/row_cursor.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/schema_cache.h"
Expand Down Expand Up @@ -249,38 +250,66 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context

// load segments
bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &_segment_cache_handle,
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &segment_cache_handle,
should_use_cache));

// create iterator for each segment
auto& segments = _segment_cache_handle.get_segments();
auto& segments = segment_cache_handle.get_segments();
_segments_rows.resize(segments.size());
for (size_t i = 0; i < segments.size(); i++) {
_segments_rows[i] = segments[i]->num_rows();
}

auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
seg_end = segments.size();
}

const bool is_merge_iterator = _is_merge_iterator();
const bool use_lazy_init_iterators =
!is_merge_iterator && _read_context->reader_type == ReaderType::READER_QUERY;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need check reader type?

for (int i = seg_start; i < seg_end; i++) {
auto& seg_ptr = segments[i];
std::unique_ptr<RowwiseIterator> iter;
Status status;

/// If `_segment_row_ranges` is empty, the segment is not split.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
status = seg_ptr->new_iterator(_input_schema, _read_options, &iter);
if (use_lazy_init_iterators) {
/// For non-merging iterators, we don't need to initialize them all at once when creating them.
/// Instead, we should initialize each iterator separately when really using them.
/// This optimization minimizes the lifecycle of resources like column readers
/// and prevents excessive memory consumption, especially for wide tables.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema,
_read_options);
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema,
local_options);
}
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
status = seg_ptr->new_iterator(_input_schema, local_options, &iter);
}
Status status;
/// If `_segment_row_ranges` is empty, the segment is not split.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
status = seg_ptr->new_iterator(_input_schema, _read_options, &iter);
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
status = seg_ptr->new_iterator(_input_schema, local_options, &iter);
}

if (!status.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
<< "]: " << status.to_string();
return Status::Error<ROWSET_READER_INIT>(status.to_string());
if (!status.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
<< "]: " << status.to_string();
return Status::Error<ROWSET_READER_INIT>(status.to_string());
}
}

if (iter->empty()) {
continue;
}
Expand Down Expand Up @@ -388,11 +417,7 @@ bool BetaRowsetReader::_should_push_down_value_predicates() const {
}

Status BetaRowsetReader::get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) {
auto& seg_ptrs = _segment_cache_handle.get_segments();
segment_num_rows->resize(seg_ptrs.size());
for (size_t i = 0; i < seg_ptrs.size(); i++) {
(*segment_num_rows)[i] = seg_ptrs[i]->num_rows();
}
segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
Copy link
Copy Markdown
Contributor

@yiguolei yiguolei May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you make sure that this method is called after get_segment_iterators??? I think it is very dangerous to make such assumption

return Status::OK();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some method like new_column_iterator also need load column readers


Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ class BetaRowsetReader : public RowsetReader {

std::unique_ptr<RowwiseIterator> _iterator;

// make sure this handle is initialized and valid before
// reading data.
SegmentCacheHandle _segment_cache_handle;
std::vector<uint32_t> _segments_rows;

StorageReadOptions _read_options;

Expand Down
38 changes: 38 additions & 0 deletions be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"

namespace doris::segment_v2 {

LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr<Segment> segment,
SchemaSPtr schema, const StorageReadOptions& opts)
: _schema(std::move(schema)), _segment(std::move(segment)), _read_options(opts) {}

/// Here do not use the argument of `opts`,
/// see where the iterator is created in `BetaRowsetReader::get_segment_iterators`
Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) {
_need_lazy_init = false;
if (_inner_iterator) {
return Status::OK();
}

RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options, &_inner_iterator));
return _inner_iterator->init(_read_options);
}

} // namespace doris::segment_v2
67 changes: 67 additions & 0 deletions be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "vec/core/block.h"

namespace doris::segment_v2 {

using namespace vectorized;

class LazyInitSegmentIterator : public RowwiseIterator {
public:
LazyInitSegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema,
const StorageReadOptions& opts);

~LazyInitSegmentIterator() override = default;

Status init(const StorageReadOptions& opts) override;

Status next_batch(Block* block) override {
if (UNLIKELY(_need_lazy_init)) {
RETURN_IF_ERROR(init(_read_options));
DCHECK(_inner_iterator != nullptr);
}

return _inner_iterator->next_batch(block);
}

const Schema& schema() const override { return *_schema; }

Status current_block_row_locations(std::vector<RowLocation>* locations) override {
return _inner_iterator->current_block_row_locations(locations);
}

bool update_profile(RuntimeProfile* profile) override {
if (_inner_iterator != nullptr) {
return _inner_iterator->update_profile(profile);
}
return false;
}

private:
bool _need_lazy_init {true};
SchemaSPtr _schema = nullptr;
std::shared_ptr<Segment> _segment;
StorageReadOptions _read_options;
RowwiseIteratorUPtr _inner_iterator;
};
} // namespace doris::segment_v2
29 changes: 22 additions & 7 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,15 @@ Segment::~Segment() {
}

Status Segment::_open() {
SegmentFooterPB footer;
RETURN_IF_ERROR(_parse_footer(&footer));
RETURN_IF_ERROR(_create_column_readers(footer));
_pk_index_meta.reset(footer.has_primary_key_index_meta()
? new PrimaryKeyIndexMetaPB(footer.primary_key_index_meta())
_footer_pb = std::make_unique<SegmentFooterPB>();
RETURN_IF_ERROR(_parse_footer(_footer_pb.get()));
_pk_index_meta.reset(_footer_pb->has_primary_key_index_meta()
? new PrimaryKeyIndexMetaPB(_footer_pb->primary_key_index_meta())
: nullptr);
// delete_bitmap_calculator_test.cpp
// DCHECK(footer.has_short_key_index_page());
_sk_index_page = footer.short_key_index_page();
_num_rows = footer.num_rows();
_sk_index_page = _footer_pb->short_key_index_page();
_num_rows = _footer_pb->num_rows();
return Status::OK();
}

Expand All @@ -132,6 +131,8 @@ Status Segment::_open_inverted_index() {

Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options,
std::unique_ptr<RowwiseIterator>* iter) {
RETURN_IF_ERROR(_create_column_readers_once());

read_options.stats->total_segment_number++;
// trying to prune the current segment by segment-level zone map
for (auto& entry : read_options.col_id_to_predicates) {
Expand Down Expand Up @@ -384,6 +385,15 @@ vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInDataPtr path
// TODO support normal column type
return nullptr;
}

Status Segment::_create_column_readers_once() {
return _create_column_readers_once_call.call([&] {
DCHECK(_footer_pb);
Defer defer([&]() { _footer_pb.reset(); });
return _create_column_readers(*_footer_pb);
});
}

Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
std::unordered_map<uint32_t, uint32_t> column_id_to_footer_ordinal;
std::unordered_map<vectorized::PathInData, uint32_t, vectorized::PathInData::Hash>
Expand Down Expand Up @@ -588,6 +598,8 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
const StorageReadOptions* opt) {
RETURN_IF_ERROR(_create_column_readers_once());

// init column iterator by path info
if (tablet_column.has_path_info() || tablet_column.is_variant_type()) {
return new_column_iterator_with_path(tablet_column, iter, opt);
Expand Down Expand Up @@ -615,6 +627,7 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column,
}

Status Segment::new_column_iterator(int32_t unique_id, std::unique_ptr<ColumnIterator>* iter) {
RETURN_IF_ERROR(_create_column_readers_once());
ColumnIterator* it;
RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it));
iter->reset(it);
Expand All @@ -640,6 +653,7 @@ ColumnReader* Segment::_get_column_reader(const TabletColumn& col) {

Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column,
std::unique_ptr<BitmapIndexIterator>* iter) {
RETURN_IF_ERROR(_create_column_readers_once());
ColumnReader* reader = _get_column_reader(tablet_column);
if (reader != nullptr && reader->has_bitmap_index()) {
BitmapIndexIterator* it;
Expand All @@ -654,6 +668,7 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column,
const TabletIndex* index_meta,
const StorageReadOptions& read_options,
std::unique_ptr<InvertedIndexIterator>* iter) {
RETURN_IF_ERROR(_create_column_readers_once());
ColumnReader* reader = _get_column_reader(tablet_column);
if (reader != nullptr && index_meta) {
if (_inverted_index_file_reader == nullptr) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
Status _load_index_impl();
Status _open_inverted_index();

Status _create_column_readers_once();

private:
friend class SegmentIterator;
io::FileSystemSPtr _fs;
Expand Down Expand Up @@ -247,6 +249,11 @@ class Segment : public std::enable_shared_from_this<Segment> {
DorisCallOnce<Status> _load_index_once;
// used to guarantee that primary key bloom filter will be loaded at most once in a thread-safe way
DorisCallOnce<Status> _load_pk_bf_once;

DorisCallOnce<Status> _create_column_readers_once_call;

std::unique_ptr<SegmentFooterPB> _footer_pb;

// used to hold short key index page in memory
PageHandle _sk_index_handle;
// short key index decoder
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/olap/vgeneric_iterators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ class VUnionIterator : public RowwiseIterator {
private:
const Schema* _schema = nullptr;
RowwiseIteratorUPtr _cur_iter = nullptr;
StorageReadOptions _read_options;
std::vector<RowwiseIteratorUPtr> _origin_iters;
};

Expand All @@ -392,10 +393,9 @@ Status VUnionIterator::init(const StorageReadOptions& opts) {
// in the same order as the original segments.
std::reverse(_origin_iters.begin(), _origin_iters.end());

for (auto& iter : _origin_iters) {
RETURN_IF_ERROR(iter->init(opts));
}
_read_options = opts;
_cur_iter = std::move(_origin_iters.back());
RETURN_IF_ERROR(_cur_iter->init(_read_options));
_schema = &_cur_iter->schema();
return Status::OK();
}
Expand All @@ -407,6 +407,7 @@ Status VUnionIterator::next_batch(Block* block) {
_origin_iters.pop_back();
if (!_origin_iters.empty()) {
_cur_iter = std::move(_origin_iters.back());
RETURN_IF_ERROR(_cur_iter->init(_read_options));
} else {
_cur_iter = nullptr;
}
Expand Down