Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 49 additions & 72 deletions be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,11 @@ struct WindowFunnelState {
int64_t window;
bool enable_mode;
WindowFunnelMode window_funnel_mode;
mutable MutableColumnPtr timestamp_column;
mutable MutableColumns event_columns;
mutable vectorized::MutableBlock mutable_block;
ColumnVector<NativeType>::Container* timestamp_column_data;
std::vector<ColumnVector<UInt8>::Container*> event_columns_datas;
Block block;
SortDescription sort_description {1};
bool sorted;
bool is_merge;

WindowFunnelState() {
event_count = 0;
Expand All @@ -101,30 +98,38 @@ struct WindowFunnelState {
sort_description[0].direction = 1;
sort_description[0].nulls_direction = -1;
sorted = false;
is_merge = false;
}
WindowFunnelState(int arg_event_count) : WindowFunnelState() {
timestamp_column = ColumnVector<NativeType>::create();
event_count = arg_event_count;
auto timestamp_column = ColumnVector<NativeType>::create();
timestamp_column_data =
&assert_cast<ColumnVector<NativeType>&>(*timestamp_column).get_data();
event_count = arg_event_count;
event_columns.resize(event_count);

MutableColumns event_columns;
for (int i = 0; i < event_count; i++) {
event_columns[i] = ColumnVector<UInt8>::create();
auto event_column = ColumnVector<UInt8>::create();
event_columns_datas.emplace_back(
&assert_cast<ColumnVector<UInt8>&>(*event_columns[i]).get_data());
&assert_cast<ColumnVector<UInt8>&>(*event_column).get_data());
event_columns.emplace_back(std::move(event_column));
}
Block tmp_block;
tmp_block.insert({std::move(timestamp_column),
DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"});
for (int i = 0; i < event_count; i++) {
tmp_block.insert({std::move(event_columns[i]),
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
"event_" + std::to_string(i)});
}

mutable_block = MutableBlock(std::move(tmp_block));
}

void reset() {
window = 0;
timestamp_column->clear();
for (auto& column : event_columns) {
column->clear();
}
block.clear_column_data();
mutable_block.clear();
timestamp_column_data = nullptr;
event_columns_datas.clear();
sorted = false;
is_merge = false;
}

void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) {
Expand All @@ -144,24 +149,11 @@ struct WindowFunnelState {
if (sorted) {
return;
}
if (!is_merge) {
Block tmp_block;
tmp_block.insert({std::move(timestamp_column),
DataTypeFactory::instance().create_data_type(TYPE_INDEX),
"timestamp"});
for (int i = 0; i < event_count; i++) {
tmp_block.insert({std::move(event_columns[i]),
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
"event_" + std::to_string(i)});
}

block = tmp_block.clone_without_columns();
sort_block(tmp_block, block, sort_description, 0);
} else {
auto tmp_block = block.clone_without_columns();
sort_block(block, tmp_block, sort_description, 0);
block = std::move(tmp_block);
}
Block tmp_block = mutable_block.to_block();
auto block = tmp_block.clone_without_columns();
sort_block(tmp_block, block, sort_description, 0);
mutable_block = MutableBlock(std::move(block));
sorted = true;
}

Expand All @@ -174,9 +166,9 @@ struct WindowFunnelState {
TimeInterval interval(SECOND, window, false);

int column_idx = 1;
const auto& first_event_column = block.get_by_position(column_idx);
const auto& first_event_column = mutable_block.get_column_by_position(column_idx);
const auto& first_event_data =
assert_cast<const ColumnVector<UInt8>&>(*first_event_column.column).get_data();
assert_cast<const ColumnVector<UInt8>&>(*first_event_column).get_data();
auto match_row = simd::find_one(first_event_data.data(), start_row, row_count);
start_row = match_row + 1;
if (match_row < row_count) {
Expand All @@ -188,12 +180,13 @@ struct WindowFunnelState {

column_idx++;
auto last_match_row = match_row;
for (; column_idx < event_count + 1; column_idx++) {
const auto& event_column = block.get_by_position(column_idx);
++match_row;
for (; column_idx < event_count + 1 && match_row < row_count;
column_idx++, match_row++) {
const auto& event_column = mutable_block.get_column_by_position(column_idx);
const auto& event_data =
assert_cast<const ColumnVector<UInt8>&>(*event_column.column).get_data();
assert_cast<const ColumnVector<UInt8>&>(*event_column).get_data();
if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
++match_row;
if (event_data[match_row] == 1) {
auto current_timestamp =
binary_cast<NativeType, DateValueType>(timestamp_data[match_row]);
Expand All @@ -204,7 +197,7 @@ struct WindowFunnelState {
}
break;
}
match_row = simd::find_one(event_data.data(), match_row + 1, row_count);
match_row = simd::find_one(event_data.data(), match_row, row_count);
if (match_row < row_count) {
auto current_timestamp =
binary_cast<NativeType, DateValueType>(timestamp_data[match_row]);
Expand All @@ -227,10 +220,9 @@ struct WindowFunnelState {
for (int tmp_column_idx = 1; tmp_column_idx < column_idx;
tmp_column_idx++) {
const auto& tmp_event_column =
block.get_by_position(tmp_column_idx);
mutable_block.get_column_by_position(tmp_column_idx);
const auto& tmp_event_data =
assert_cast<const ColumnVector<UInt8>&>(
*tmp_event_column.column)
assert_cast<const ColumnVector<UInt8>&>(*tmp_event_column)
.get_data();
auto dup_match_row = simd::find_one(tmp_event_data.data(),
last_match_row + 1, match_row);
Expand Down Expand Up @@ -258,11 +250,11 @@ struct WindowFunnelState {
int _get_internal() const {
size_t start_row = 0;
int max_found_event_count = 0;
const auto& ts_column = block.get_by_position(0).column->get_ptr();
const auto& ts_column = mutable_block.get_column_by_position(0)->get_ptr();
const auto& timestamp_data =
assert_cast<const ColumnVector<NativeType>&>(*ts_column).get_data().data();

auto row_count = block.rows();
auto row_count = mutable_block.rows();
while (start_row < row_count) {
auto found_event_count =
_match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count, timestamp_data);
Expand All @@ -274,7 +266,7 @@ struct WindowFunnelState {
return max_found_event_count;
}
int get() const {
auto row_count = block.rows();
auto row_count = mutable_block.rows();
if (event_count == 0 || row_count == 0) {
return 0;
}
Expand All @@ -294,16 +286,13 @@ struct WindowFunnelState {
}

void merge(const WindowFunnelState& other) {
is_merge = true;
MutableBlock mutable_block(&block);
if (!other.block.empty()) {
auto st = mutable_block.merge(other.block);
if (!other.mutable_block.empty()) {
auto st = mutable_block.merge(other.mutable_block.to_block());
if (!st.ok()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
return;
}
}
block = mutable_block.to_block();

event_count = event_count > 0 ? event_count : other.event_count;
window = window > 0 ? window : other.window;
Expand All @@ -328,27 +317,13 @@ struct WindowFunnelState {
size_t compressed_bytes = 0;
Status status;
std::string buff;
if (is_merge) {
// as the branch-2.1 is used the new impl of window funnel, and the be_exec_version is 5
// but in branch-3.0 this be_exec_version have update to 7, so when upgrade from branch-2.1 to branch-3.0
// maybe have error send the branch-3.0 version of version 7 to branch-2.1([0---version--5])
status = block.serialize(
5, &pblock, &uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio
} else {
Block tmp_block;
tmp_block.insert({std::move(timestamp_column),
DataTypeFactory::instance().create_data_type(TYPE_INDEX),
"timestamp"});
for (int i = 0; i < event_count; i++) {
tmp_block.insert({std::move(event_columns[i]),
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
"event_" + std::to_string(i)});
}
status = tmp_block.serialize(
5, &pblock, &uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio
}
Block block = mutable_block.to_block();
// as the branch-2.1 is used the new impl of window funnel, and the be_exec_version is 5
// but in branch-3.0 this be_exec_version have update to 7, so when upgrade from branch-2.1 to branch-3.0
// maybe have error send the branch-3.0 version of version 7 to branch-2.1([0---version--5])
status = block.serialize(
5, &pblock, &uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio
if (!status.ok()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, status.to_string());
return;
Expand Down Expand Up @@ -385,10 +360,12 @@ struct WindowFunnelState {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Failed to parse window_funnel data to block");
}
Block block;
auto status = block.deserialize(pblock);
if (!status.ok()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, status.to_string());
}
mutable_block = MutableBlock(std::move(block));
}
};

Expand Down