Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ Status BlockReader::_init_agg_state(const ReaderParams& read_params) {
Status BlockReader::init(const ReaderParams& read_params) {
RETURN_IF_ERROR(TabletReader::init(read_params));

_arena = std::make_unique<Arena>();

int32_t return_column_size = read_params.origin_return_columns->size();
_return_columns_loc.resize(read_params.return_columns.size());
for (int i = 0; i < return_column_size; ++i) {
Expand Down Expand Up @@ -511,6 +513,10 @@ size_t BlockReader::_copy_agg_data() {
}

void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, bool is_close) {
if (!_arena) [[unlikely]] {
return;
}

for (int i = 0; i < _agg_columns_idx.size(); i++) {
auto idx = _agg_columns_idx[i];

Expand All @@ -520,7 +526,7 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,

if (begin <= end) {
function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
&_arena, _stored_has_null_tag[idx]);
_arena.get(), _stored_has_null_tag[idx]);
}

if (is_close) {
Expand All @@ -529,8 +535,9 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,
function->reset(place);
}
}

if (is_close) {
_arena.clear();
_arena->clear();
}
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/olap/block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ class BlockReader final : public TabletReader {

bool _is_rowsets_overlapping = true;

Arena _arena;
// Use pointer to avoid allocing memory during construction
std::unique_ptr<Arena> _arena;
};

} // namespace vectorized
Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/olap/vertical_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ Status VerticalBlockReader::init(const ReaderParams& read_params) {
_reader_context.batch_size = opts.block_row_max;
RETURN_IF_ERROR(TabletReader::init(read_params));

_arena = std::make_unique<Arena>();

auto status = _init_collect_iter(read_params);
if (!status.ok()) [[unlikely]] {
if (!config::is_cloud_mode()) {
Expand Down Expand Up @@ -298,14 +300,17 @@ void VerticalBlockReader::_update_agg_data(MutableColumns& columns) {

void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,
bool is_close) {
if (!_arena) [[unlikely]] {
return;
}
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
AggregateFunctionPtr function = _agg_functions[idx];
AggregateDataPtr place = _agg_places[idx];
auto* column_ptr = _stored_data_columns[idx].get();

if (begin <= end) {
function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
&_arena, _stored_has_null_tag[idx]);
_arena.get(), _stored_has_null_tag[idx]);
}

if (is_close) {
Expand All @@ -315,7 +320,7 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin,
}
}
if (is_close) {
_arena.clear();
_arena->clear();
}
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/olap/vertical_block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ class VerticalBlockReader final : public TabletReader {
// for agg mode
std::vector<AggregateFunctionPtr> _agg_functions;
std::vector<AggregateDataPtr> _agg_places;
Arena _arena;

// Use pointer to avoid memory allocation during construction
std::unique_ptr<Arena> _arena;

std::vector<int> _normal_columns_idx;
std::vector<int> _agg_columns_idx;
Expand Down