Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,12 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);

RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(true);
// Variant schema maybe updated, so we need to update the schema as well.
// The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap`
// will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset
// for variant type
bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0;
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema);
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
Status st =
retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset);
Expand Down
7 changes: 5 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,11 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write
RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = std::make_shared<TabletSchema>();
context.tablet_schema->copy_from(*(rowset.tablet_schema()));
// During a partial update, the extracted columns of a variant should not be included in the tablet schema.
// This is because the partial update for a variant needs to ignore the extracted columns.
// Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
// the complete variant is constructed by reading all the sub-columns of the variant.
context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");
DEFINE_mBool(variant_enable_flatten_nested, "false");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "1000");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");

// block file cache
DEFINE_Bool(enable_file_cache, "false");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,8 @@ DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
// Threshold to estimate a column is sparsed
// Notice: TEST ONLY
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
// Treat invalid json format str as string, instead of throwing exception if false
DECLARE_mBool(variant_throw_exeception_on_invalid_json);

DECLARE_mBool(enable_merge_on_write_correctness_check);
// USED FOR DEBUGING
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,9 @@ Status VariantRootColumnIterator::next_batch(size_t* n, vectorized::MutableColum
if (obj.is_null_root()) {
obj.create_root();
}
if (!obj.is_finalized()) {
obj.finalize();
}
auto root_column = obj.get_root();
RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));
obj.incr_num_rows(*n);
Expand Down Expand Up @@ -1634,6 +1637,9 @@ Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const si
if (obj.is_null_root()) {
obj.create_root();
}
if (!obj.is_finalized()) {
obj.finalize();
}
auto root_column = obj.get_root();
RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));
obj.incr_num_rows(count);
Expand Down
129 changes: 94 additions & 35 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "vec/columns/column_object.h"

#include <assert.h>
#include <fmt/core.h>
#include <fmt/format.h>
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
Expand All @@ -34,6 +35,7 @@
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <vector>

#include "common/compiler_util.h" // IWYU pragma: keep
Expand Down Expand Up @@ -677,8 +679,6 @@ void ColumnObject::check_consistency() const {
}
for (const auto& leaf : subcolumns) {
if (num_rows != leaf->data.size()) {
// LOG(FATAL) << "unmatched column:" << leaf->path.get_path()
// << ", expeted rows:" << num_rows << ", but meet:" << leaf->data.size();
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"unmatched column: {}, expeted rows: {}, but meet: {}",
leaf->path.get_path(), num_rows, leaf->data.size());
Expand Down Expand Up @@ -791,26 +791,61 @@ void ColumnObject::insert_default() {
++num_rows;
}

Field ColumnObject::operator[](size_t n) const {
if (!is_finalized()) {
const_cast<ColumnObject*>(this)->finalize();
}
VariantMap map;
for (const auto& entry : subcolumns) {
if (WhichDataType(remove_nullable(entry->data.data_types.back())).is_json()) {
void ColumnObject::Subcolumn::get(size_t n, Field& res) const {
if (is_finalized()) {
if (least_common_type.get_base_type_id() == TypeIndex::JSONB) {
// JsonbFiled is special case
Field f = JsonbField();
(*entry->data.data.back()).get(n, f);
map[entry->path.get_path()] = std::move(f);
continue;
res = JsonbField();
}
get_finalized_column().get(n, res);
return;
}

size_t ind = n;
if (ind < num_of_defaults_in_prefix) {
if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
res = Null();
return;
}
map[entry->path.get_path()] = (*entry->data.data.back())[n];
res = least_common_type.get()->get_default();
return;
}
return map;

ind -= num_of_defaults_in_prefix;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
const auto& part_type = data_types[i];
if (ind < part->size()) {
res = vectorized::remove_nullable(part_type)->get_default();
part->get(ind, res);
Field new_field;
convert_field_to_type(res, *least_common_type.get(), &new_field);
res = new_field;
return;
}

ind -= part->size();
}

throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range",
n);
}

Field ColumnObject::operator[](size_t n) const {
Field object;
get(n, object);
return object;
}

void ColumnObject::get(size_t n, Field& res) const {
res = (*this)[n];
assert(n < size());
res = VariantMap();
auto& object = res.get<VariantMap&>();

for (const auto& entry : subcolumns) {
auto it = object.try_emplace(entry->path.get_path()).first;
entry->data.get(n, it->second);
}
}

Status ColumnObject::try_insert_indices_from(const IColumn& src, const int* indices_begin,
Expand Down Expand Up @@ -1380,7 +1415,10 @@ void ColumnObject::strip_outer_array() {

ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
if (!is_finalized()) {
const_cast<ColumnObject*>(this)->finalize();
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
return finalized_object.apply_for_subcolumns(
[&](const auto& subcolumn) { return subcolumn.filter(filter, count); });
}
auto new_column = ColumnObject::create(true, false);
for (auto& entry : subcolumns) {
Expand Down Expand Up @@ -1545,29 +1583,54 @@ void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* indic
}
}

void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback callback) const {
if (!is_finalized()) {
// finalize has no side effect and can be safely used in const functions
const_cast<ColumnObject*>(this)->finalize();
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
finalized_object.for_each_imutable_subcolumn(callback);
return;
}
for_each_imutable_subcolumn([&](const auto& subcolumn) {
if (n >= subcolumn.size()) {
LOG(FATAL) << n << " greater than column size " << subcolumn.size()
<< " sub_column_info:" << subcolumn.dump_structure()
<< " total lines of this column " << num_rows;
}
return subcolumn.update_hash_with_value(n, hash);
});
}

void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback callback) const {
for (const auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
callback(*part);
}
}
}

void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
for_each_imutable_subcolumn(
[&](const auto& subcolumn) { return subcolumn.update_hash_with_value(n, hash); });
}

void ColumnObject::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_hashes_with_value(hashes, nullptr);
});
}

void ColumnObject::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_xxHash_with_value(start, end, hash, nullptr);
});
}

void ColumnObject::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type,
uint32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_crcs_with_value(hash, type, rows, offset, nullptr);
});
}

void ColumnObject::update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_crc_with_value(start, end, hash, nullptr);
});
}

std::string ColumnObject::debug_string() const {
std::stringstream res;
res << get_family_name() << "(num_row = " << num_rows;
Expand Down Expand Up @@ -1600,8 +1663,4 @@ Status ColumnObject::sanitize() const {
return Status::OK();
}

void ColumnObject::replace_column_data(const IColumn& col, size_t row, size_t self_row) {
LOG(FATAL) << "Method replace_column_data is not supported for " << get_name();
}

} // namespace doris::vectorized
Loading