diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 04028431ba15..19a9ccf58aa3 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -162,6 +162,7 @@ set(PARQUET_SRCS arrow/writer.cc bloom_filter.cc bloom_filter_reader.cc + row_range.cc column_reader.cc column_scanner.cc column_writer.cc @@ -366,6 +367,8 @@ add_parquet_test(reader-test level_conversion_test.cc column_scanner_test.cc reader_test.cc + range_reader_test.cc + row_range_test.cc stream_reader_test.cc test_util.cc) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index d6ad7c25bc7c..e151fb8c4dee 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -17,6 +17,8 @@ #include "parquet/arrow/reader.h" +#include "parquet/page_index.h" + #include #include #include @@ -199,10 +201,11 @@ class FileReaderImpl : public FileReader { return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out); } - Status GetFieldReader(int i, - const std::shared_ptr>& included_leaves, - const std::vector& row_groups, - std::unique_ptr* out) { + Status GetFieldReader( + int i, const std::shared_ptr>& included_leaves, + const std::vector& row_groups, + const std::shared_ptr>>& row_ranges_per_rg, + std::unique_ptr* out) { // Should be covered by GetRecordBatchReader checks but // manifest_.schema_fields is a separate variable so be extra careful. if (ARROW_PREDICT_FALSE(i < 0 || @@ -218,13 +221,16 @@ class FileReaderImpl : public FileReader { ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; + ctx->row_ranges_per_rg = + row_ranges_per_rg; // copy the shared pointer to extend its lifecycle return GetReader(manifest_.schema_fields[i], ctx, out); } - Status GetFieldReaders(const std::vector& column_indices, - const std::vector& row_groups, - std::vector>* out, - std::shared_ptr<::arrow::Schema>* out_schema) { + Status GetFieldReaders( + const std::vector& column_indices, const std::vector& row_groups, + const std::shared_ptr>>& row_ranges_per_rg, + std::vector>* out, + std::shared_ptr<::arrow::Schema>* out_schema) { // We only need to read schema fields which have columns indicated // in the indices vector ARROW_ASSIGN_OR_RAISE(std::vector field_indices, @@ -236,8 +242,8 @@ class FileReaderImpl : public FileReader { ::arrow::FieldVector out_fields(field_indices.size()); for (size_t i = 0; i < out->size(); ++i) { std::unique_ptr reader; - RETURN_NOT_OK( - GetFieldReader(field_indices[i], included_leaves, row_groups, &reader)); + RETURN_NOT_OK(GetFieldReader(field_indices[i], included_leaves, row_groups, + row_ranges_per_rg, &reader)); out_fields[i] = reader->field(); out->at(i) = std::move(reader); @@ -325,19 +331,61 @@ class FileReaderImpl : public FileReader { return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); } + // This is a internal API owned by FileReaderImpl, not exposed in FileReader + Status GetRecordBatchReaderWithRowRanges( + const std::vector& row_group_indices, const std::vector& column_indices, + const std::shared_ptr>>& row_ranges_per_rg, + std::unique_ptr* out); + + Status GetRecordBatchReader(const RowRanges& rows_to_return, + const std::vector& column_indices, + std::unique_ptr* out) override { + const auto metadata = reader_->metadata(); + // check if the row ranges are within the row group boundaries + if (rows_to_return.num_rows() != 0 && + rows_to_return.last_row() >= metadata->num_rows()) { + return Status::Invalid("The provided row range " + rows_to_return.ToString() + + " exceeds the number of rows in the file: " + + std::to_string(metadata->num_rows())); + } + if (rows_to_return.num_rows() == 0) { + return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out); + } + + std::vector rows_per_rg; + for (int i = 0; i < metadata->num_row_groups(); i++) { + rows_per_rg.push_back(metadata->RowGroup(i)->num_rows()); + } + // We'll assign a RowRanges for each RG, even if it's not required to return any rows + std::vector> row_ranges_per_rg = + rows_to_return.SplitByRowRange(rows_per_rg); + std::vector row_group_indices; + for (int i = 0; i < metadata->num_row_groups(); i++) { + if (row_ranges_per_rg.at(i)->num_rows() > 0) row_group_indices.push_back(i); + } + + return GetRecordBatchReaderWithRowRanges( + row_group_indices, column_indices, + std::make_shared>>( + std::move(row_ranges_per_rg)), + out); + } + Status GetRecordBatchReader(const std::vector& row_group_indices, const std::vector& column_indices, - std::unique_ptr* out) override; + std::unique_ptr* out) override { + return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices, {}, out); + } Status GetRecordBatchReader(const std::vector& row_group_indices, std::unique_ptr* out) override { - return GetRecordBatchReader(row_group_indices, - Iota(reader_->metadata()->num_columns()), out); + return GetRecordBatchReaderWithRowRanges( + row_group_indices, Iota(reader_->metadata()->num_columns()), {}, out); } Status GetRecordBatchReader(std::unique_ptr* out) override { - return GetRecordBatchReader(Iota(num_row_groups()), - Iota(reader_->metadata()->num_columns()), out); + return GetRecordBatchReaderWithRowRanges( + Iota(num_row_groups()), Iota(reader_->metadata()->num_columns()), {}, out); } ::arrow::Result<::arrow::AsyncGenerator>> @@ -440,6 +488,64 @@ class RowGroupReaderImpl : public RowGroupReader { // ---------------------------------------------------------------------- // Column reader implementations +// This class is used to skip decompressing & decoding unnecessary pages by comparing +// user-specified row_ranges and page_ranges from metadata. Only support IntervalRange +// case for now. +class RowRangesPageFilter { + public: + RowRangesPageFilter(const RowRanges& row_ranges, + const std::shared_ptr& page_ranges) + : row_ranges_(row_ranges), page_ranges_(page_ranges) {} + + // To avoid error "std::function target must be copy-constructible", we must define copy + // constructor + RowRangesPageFilter(const RowRangesPageFilter& other) + : row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) {} + + bool operator()(const DataPageStats& stats) { + if (!initted) { + row_ranges_itr_ = row_ranges_.NewIterator(); + page_ranges_itr_ = page_ranges_->NewIterator(); + + current_row_range_ = row_ranges_itr_->NextRange(); + + if (current_row_range_.index() != 0) { + throw ParquetException( + "RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + } + initted = true; + } + + current_page_range_ = page_ranges_itr_->NextRange(); + if (current_page_range_.index() != 0) { + throw ParquetException( + "RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + } + + while (current_row_range_.index() == 0 && + IntervalRangeUtils::IsAfter(std::get(current_page_range_), + std::get(current_row_range_))) { + current_row_range_ = row_ranges_itr_->NextRange(); + } + + if (current_row_range_.index() != 0) { + return true; + } + + return IntervalRangeUtils::IsBefore(std::get(current_page_range_), + std::get(current_row_range_)); + } + + private: + const RowRanges& row_ranges_; + const std::shared_ptr page_ranges_; + std::unique_ptr row_ranges_itr_ = NULLPTR; + std::unique_ptr page_ranges_itr_ = NULLPTR; + std::variant current_row_range_ = End(); + std::variant current_page_range_ = End(); + bool initted = false; +}; + // Leaf reader is for primitive arrays and primitive children of nested arrays class LeafReader : public ColumnReaderImpl { public: @@ -501,8 +607,79 @@ class LeafReader : public ColumnReaderImpl { private: std::shared_ptr out_; + + void checkAndGetPageRanges(const RowRanges& row_ranges, + std::shared_ptr& page_ranges) const { + // check offset exists + const auto rg_pg_index_reader = + ctx_->reader->GetPageIndexReader()->RowGroup(input_->current_row_group()); + + if (!rg_pg_index_reader) { + throw ParquetException( + "Attempting to read with Ranges but Page Index is not found for Row " + "Group: " + + std::to_string(input_->current_row_group())); + } + const auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index()); + + if (!offset_index) { + throw ParquetException( + "Attempting to read with Ranges but Offset index is not found for " + "column: " + + field_->name()); + } + + const auto page_locations = offset_index->page_locations(); + page_ranges = std::make_shared(); + for (size_t i = 0; i < page_locations.size() - 1; i++) { + page_ranges->Add( + {page_locations[i].first_row_index, page_locations[i + 1].first_row_index - 1}); + } + if (page_locations.size() >= 1) { + page_ranges->Add( + {page_locations[page_locations.size() - 1].first_row_index, + ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() - + 1}); + } + + if (row_ranges.num_rows() > 0) { + if (row_ranges.last_row() > page_ranges->last_row()) { + throw ParquetException( + "The provided row range " + row_ranges.ToString() + " exceeds last page :" + + IntervalRangeUtils::ToString(page_ranges->GetRanges().back())); + } + } + } + void NextRowGroup() { std::unique_ptr page_reader = input_->NextChunk(); + + /// using page index to reduce cost + if (page_reader != nullptr && ctx_->row_ranges_per_rg) { + // reset skipper + record_reader_->reset_record_skipper(); + + const auto& row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()]; + // if specific row range is provided for this rg + if (row_ranges->num_rows() != 0) { + // Use IntervalRanges to represent pages + std::shared_ptr page_ranges; + checkAndGetPageRanges(*row_ranges, page_ranges); + + // part 1, skip decompressing & decoding unnecessary pages + page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges)); + + // part 2, skip unnecessary rows in necessary pages + record_reader_->set_record_skipper( + std::make_unique(*page_ranges, + *row_ranges)); + } else { + NextRowGroup(); + return; + } + } + + record_reader_->reset_current_rg_processed_records(); record_reader_->SetPageReader(std::move(page_reader)); } @@ -971,9 +1148,10 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& } // namespace -Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, - const std::vector& column_indices, - std::unique_ptr* out) { +Status FileReaderImpl::GetRecordBatchReaderWithRowRanges( + const std::vector& row_groups, const std::vector& column_indices, + const std::shared_ptr>>& row_ranges_per_rg, + std::unique_ptr* out) { RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); if (reader_properties_.pre_buffer()) { @@ -986,7 +1164,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, std::vector> readers; std::shared_ptr<::arrow::Schema> batch_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema)); + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, row_ranges_per_rg, &readers, + &batch_schema)); if (readers.empty()) { // Just generate all batches right now; they're cheap since they have no columns. @@ -1241,7 +1420,8 @@ Future> FileReaderImpl::DecodeRowGroups( // in a sync context too so use `this` over `self` std::vector> readers; std::shared_ptr<::arrow::Schema> result_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); + RETURN_NOT_OK( + GetFieldReaders(column_indices, row_groups, {}, &readers, &result_schema)); // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 6e46ca43f7b1..ad330a7c7352 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -23,6 +23,7 @@ #include #include +#include "parquet/column_reader.h" #include "parquet/file_reader.h" #include "parquet/platform.h" #include "parquet/properties.h" @@ -180,6 +181,19 @@ class PARQUET_EXPORT FileReader { const std::vector& row_group_indices, const std::vector& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a RecordBatchReader of row groups selected from + /// rows_to_return, whose columns are selected by column_indices. + /// + /// Notice that rows_to_return is file based, it not only decides which row groups to + /// read, but also which rows to read in each row group. + /// + /// + /// \returns error Status if either rows_to_return or column_indices + /// contains an invalid index + virtual ::arrow::Status GetRecordBatchReader( + const RowRanges& rows_to_return, const std::vector& column_indices, + std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a RecordBatchReader of row groups selected from /// row_group_indices, whose columns are selected by column_indices. /// diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index cf9dbb86577b..b30aef2691c1 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -76,6 +76,7 @@ class FileColumnIterator { } auto row_group_reader = reader_->RowGroup(row_groups_.front()); + current_rg_ = row_groups_.front(); row_groups_.pop_front(); return row_group_reader->GetColumnPageReader(column_index_); } @@ -88,11 +89,14 @@ class FileColumnIterator { int column_index() const { return column_index_; } + int current_row_group() const { return current_rg_; } + protected: int column_index_; ParquetFileReader* reader_; const SchemaDescriptor* schema_; std::deque row_groups_; + int current_rg_ = 0; }; using FileColumnIteratorFactory = @@ -109,6 +113,7 @@ struct ReaderContext { FileColumnIteratorFactory iterator_factory; bool filter_leaves; std::shared_ptr> included_leaves; + std::shared_ptr>> row_ranges_per_rg; bool IncludesLeaf(int leaf_index) const { if (this->filter_leaves) { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index ac4627d69c0f..69f3f6bd824d 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1363,7 +1363,7 @@ class TypedRecordReader : public TypedColumnReaderImpl, int64_t records_read = 0; if (has_values_to_process()) { - records_read += ReadRecordData(num_records); + records_read += ReadRecordDataWithSkipCheck(num_records); } int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); @@ -1417,11 +1417,11 @@ class TypedRecordReader : public TypedColumnReaderImpl, } levels_written_ += levels_read; - records_read += ReadRecordData(num_records - records_read); + records_read += ReadRecordDataWithSkipCheck(num_records - records_read); } else { // No repetition or definition levels batch_size = std::min(num_records - records_read, batch_size); - records_read += ReadRecordData(batch_size); + records_read += ReadRecordDataWithSkipCheck(batch_size); } } @@ -1624,10 +1624,12 @@ class TypedRecordReader : public TypedColumnReaderImpl, // Top level required field. Number of records equals to number of levels, // and there is not read-ahead for levels. + int64_t skipped_records = 0; if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) { - return this->Skip(num_records); + skipped_records = this->Skip(num_records); + current_rg_processed_records_ += skipped_records; + return skipped_records; } - int64_t skipped_records = 0; if (this->max_rep_level_ == 0) { // Non-repeated optional field. // First consume whatever is in the buffer. @@ -1643,6 +1645,8 @@ class TypedRecordReader : public TypedColumnReaderImpl, } else { skipped_records += this->SkipRecordsRepeated(num_records); } + + current_rg_processed_records_ += skipped_records; return skipped_records; } @@ -1971,9 +1975,27 @@ class TypedRecordReader : public TypedColumnReaderImpl, this->ConsumeBufferedValues(values_to_read); } + current_rg_processed_records_ += records_read; return records_read; } + int64_t ReadRecordDataWithSkipCheck(const int64_t num_records) { + if (!skipper_) { + return ReadRecordData(num_records); + } + + while (true) { + const auto advise = skipper_->AdviseNext(current_rg_processed_records_); + if (advise == 0) { + return 0; + } + if (advise > 0) { + return ReadRecordData(std::min(num_records, advise)); + } + SkipRecords(-advise); + } + } + void DebugPrintState() override { const int16_t* def_levels = this->def_levels(); const int16_t* rep_levels = this->rep_levels(); @@ -2300,5 +2322,71 @@ std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, return nullptr; } +RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges) { + // copy row_ranges + IntervalRanges skip_pages; + for (auto& page : pages.GetRanges()) { + if (!orig_row_ranges.IsOverlapping(page.start, page.end)) { + skip_pages.Add(page); + } + } + + AdjustRanges(skip_pages, orig_row_ranges, row_ranges_); + range_iter_ = row_ranges_->NewIterator(); + current_range_variant = range_iter_->NextRange(); + + total_rows_to_process_ = pages.num_rows() - skip_pages.num_rows(); +} + +int64_t RecordSkipper::AdviseNext(const int64_t current_rg_processed) { + if (current_range_variant.index() == 2) { + return 0; + } + + auto& current_range = std::get(current_range_variant); + + if (current_range.end < current_rg_processed) { + current_range_variant = range_iter_->NextRange(); + if (current_range_variant.index() == 2) { + // negative, skip the ramaining rows + return current_rg_processed - total_rows_to_process_; + } + } + + current_range = std::get(current_range_variant); + + if (current_range.start > current_rg_processed) { + // negative, skip + return current_rg_processed - current_range.start; + } + + const auto ret = current_range.end - current_rg_processed + 1; + return ret; +} + +void RecordSkipper::AdjustRanges(IntervalRanges& skip_pages, + const RowRanges& orig_row_ranges, + std::unique_ptr& ret) { + std::unique_ptr temp = std::make_unique(); + + size_t skipped_rows = 0; + const auto orig_range_iter = orig_row_ranges.NewIterator(); + auto orig_range_variant = orig_range_iter->NextRange(); + auto skip_iter = skip_pages.GetRanges().begin(); + while (orig_range_variant.index() != 2) { + const auto& origin_range = std::get(orig_range_variant); + while (skip_iter != skip_pages.GetRanges().end() && + IntervalRangeUtils::IsBefore(*skip_iter, origin_range)) { + skipped_rows += IntervalRangeUtils::Count(*skip_iter); + ++skip_iter; + } + + temp->Add(IntervalRange(origin_range.start - skipped_rows, + origin_range.end - skipped_rows)); + orig_range_variant = orig_range_iter->NextRange(); + } + ret = std::move(temp); +} + } // namespace internal } // namespace parquet diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 086f6c0e5580..d07273deab85 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -27,6 +27,7 @@ #include "parquet/metadata.h" #include "parquet/platform.h" #include "parquet/properties.h" +#include "parquet/row_range.h" #include "parquet/schema.h" #include "parquet/types.h" @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); + /// Return the number of records to read or to skip + /// if return values is positive, it means to read N records + /// if return values is negative, it means to skip N records + /// if return values is 0, it means end of RG + int64_t AdviseNext(const int64_t current_rg_processed); + + private: + /// Since the skipped pages will be silently skipped without updating + /// current_rg_processed_records or records_read_, we need to pre-process the row + /// ranges as if these skipped pages never existed + static void AdjustRanges(IntervalRanges& skip_pages, const RowRanges& orig_row_ranges, + std::unique_ptr& ret); + + std::unique_ptr row_ranges_; + std::unique_ptr range_iter_; + std::variant current_range_variant = End(); + + size_t total_rows_to_process_ = 0; +}; + /// \brief Stateful column reader that delimits semantic records for both flat /// and nested columns /// @@ -424,6 +449,14 @@ class PARQUET_EXPORT RecordReader { /// \brief True if reading dense for nullable columns. bool read_dense_for_nullable() const { return read_dense_for_nullable_; } + void reset_current_rg_processed_records() { current_rg_processed_records_ = 0; } + + void set_record_skipper(std::unique_ptr skipper) { + skipper_ = std::move(skipper); + } + + void reset_record_skipper() { skipper_.reset(); } + protected: /// \brief Indicates if we can have nullable values. Note that repeated fields /// may or may not be nullable. @@ -432,6 +465,8 @@ class PARQUET_EXPORT RecordReader { bool at_record_start_; int64_t records_read_; + int64_t current_rg_processed_records_ = 0; // counting both read and skip records + /// \brief Stores values. These values are populated based on each ReadRecords /// call. No extra values are buffered for the next call. SkipRecords will not /// add any value to this buffer. @@ -473,6 +508,8 @@ class PARQUET_EXPORT RecordReader { // If true, we will not leave any space for the null values in the values_ // vector. bool read_dense_for_nullable_ = false; + + std::unique_ptr skipper_ = NULLPTR; }; class BinaryRecordReader : virtual public RecordReader { diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc new file mode 100644 index 000000000000..04510143e54c --- /dev/null +++ b/cpp/src/parquet/range_reader_test.cc @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "arrow/io/memory.h" +#include "arrow/result.h" +#include "arrow/util/type_fwd.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +using parquet::IntervalRange; +using parquet::IntervalRanges; + +std::string random_string(std::string::size_type length) { + static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + static std::mt19937 rg = std::mt19937(std::random_device()()); + static std::uniform_int_distribution pick(0, sizeof(chrs) - 2); + + std::string s; + s.reserve(length); + while (length--) s += chrs[pick(rg)]; + + return s; +} + +/// The table looks like (with_nulls = false): +// { +// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, +// { a: {x: 1, y: 1}, b: {1, 1, 1}, c: "1", d: 1}, +// ... +// { a: {x: 99, y: 99}, b: {99, 99, 99}, c: "99", d: 99} +// } +arrow::Result> GetTable(bool with_nulls = false) { + // if with_nulls, the generated table should null values + // set first 10 rows and last 10 rows to null + std::shared_ptr null_bitmap; + std::vector flags(100, true); + if (with_nulls) { + std::fill_n(flags.begin(), 10, false); + std::fill_n(flags.begin() + 90, 10, false); + + size_t length = flags.size(); + + ARROW_ASSIGN_OR_RAISE(null_bitmap, arrow::AllocateEmptyBitmap(length)); + + uint8_t* bitmap = null_bitmap->mutable_data(); + for (size_t i = 0; i < length; ++i) { + if (flags[i]) { + arrow::bit_util::SetBit(bitmap, i); + } + } + } + + auto int32_builder = arrow::Int32Builder(); + + // Struct col + std::shared_ptr arr_a_x; + std::shared_ptr arr_a_y; + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100))); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_a_x)); + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100))); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_a_y)); + ARROW_ASSIGN_OR_RAISE(auto arr_a, arrow::StructArray::Make( + {arr_a_x, arr_a_y}, + std::vector{"x", "y"}, null_bitmap)); + + // List col + std::shared_ptr arr_b_values; + std::shared_ptr arr_b_offsets; + std::vector b_values; + for (int i = 0; i < 100; ++i) { + for (int j = 0; j < 3; ++j) { + b_values.push_back(i); + } + } + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(b_values)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_b_values)); + std::vector offsets = arrow::internal::Iota(0, 101); + std::transform(offsets.begin(), offsets.end(), offsets.begin(), + [](const int x) { return x * 3; }); + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(offsets)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_b_offsets)); + ARROW_ASSIGN_OR_RAISE(auto arr_b, arrow::ListArray::FromArrays( + *arr_b_offsets, *arr_b_values, + arrow::default_memory_pool(), null_bitmap)); + + // string col + auto string_builder = arrow::StringBuilder(); + std::shared_ptr arr_c; + std::vector strs; + uint8_t valid_bytes[100]; + for (size_t i = 0; i < 100; i++) { + // add more chars to make this column unaligned with other columns' page + strs.push_back(std::to_string(i) + random_string(20)); + valid_bytes[i] = flags[i]; + } + ARROW_RETURN_NOT_OK(string_builder.AppendValues(strs, &valid_bytes[0])); + ARROW_RETURN_NOT_OK(string_builder.Finish(&arr_c)); + + // int col + std::shared_ptr arr_d; + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100), flags)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_d)); + + auto schema = arrow::schema({ + // complex types prior to simple types + field("a", arr_a->type()), + field("b", list(arrow::int32())), + field("c", arrow::utf8()), + field("d", arrow::int32()), + }); + + return arrow::Table::Make(schema, {arr_a, arr_b, arr_c, arr_d}); +} + +arrow::Result> WriteFullFile( + const bool with_nulls = false) { + using parquet::ArrowWriterProperties; + using parquet::WriterProperties; + + ARROW_ASSIGN_OR_RAISE(const auto table, GetTable(with_nulls)); + + const std::shared_ptr props = + WriterProperties::Builder() + .max_row_group_length(30) + ->enable_write_page_index() + ->disable_dictionary() + ->write_batch_size(1) + ->data_pagesize(30) // small pages + ->compression(arrow::Compression::SNAPPY) + ->build(); + + const std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + + ARROW_ASSIGN_OR_RAISE(const auto out_stream, ::arrow::io::BufferOutputStream::Create()); + + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(), + arrow::default_memory_pool(), out_stream, + /*chunk_size=*/100, props, arrow_props)); + + // { + // // output to a local file for debugging + // ARROW_ASSIGN_OR_RAISE(auto outfile, arrow::io::FileOutputStream::Open( + // "/tmp/range_reader_test.parquet")); + // + // ARROW_RETURN_NOT_OK( + // parquet::arrow::WriteTable(*table.get(), arrow::default_memory_pool(), outfile, + // /*chunk_size=*/100, props, arrow_props)); + // } + + return out_stream->Finish(); +} + +bool checking_col(const std::string& col_name, + const std::vector& column_names) { + return std::find(column_names.begin(), column_names.end(), col_name) != + column_names.end(); +} + +void check_rb(std::unique_ptr rb_reader, + const size_t expected_rows, const int64_t expected_sum) { + const std::vector column_names = rb_reader->schema()->field_names(); + + size_t total_rows = 0; + int64_t sum_a = 0; + int64_t sum_b = 0; + int64_t sum_c = 0; + int64_t sum_d = 0; + for (arrow::Result> maybe_batch : *rb_reader) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + total_rows += batch->num_rows(); + + if (checking_col("a", column_names)) { + auto a_array = + std::dynamic_pointer_cast(batch->GetColumnByName("a")); + auto a_x_array = std::dynamic_pointer_cast(a_array->field(0)); + auto a_y_array = std::dynamic_pointer_cast(a_array->field(1)); + for (auto iter = a_x_array->begin(); iter != a_x_array->end(); ++iter) { + sum_a += (*iter).has_value() ? (*iter).value() : 0; + } + for (auto iter = a_y_array->begin(); iter != a_y_array->end(); ++iter) { + sum_a += (*iter).has_value() ? (*iter).value() : 0; + } + } + + if (checking_col("b", column_names)) { + auto b_array = + std::dynamic_pointer_cast(batch->GetColumnByName("b")); + ASSERT_OK_AND_ASSIGN(auto flatten_b_array, b_array->Flatten()); + auto b_array_values = std::dynamic_pointer_cast(flatten_b_array); + for (auto iter = b_array_values->begin(); iter != b_array_values->end(); ++iter) { + sum_b += (*iter).has_value() ? (*iter).value() : 0; + } + } + + if (checking_col("c", column_names)) { + auto c_array = + std::dynamic_pointer_cast(batch->GetColumnByName("c")); + for (auto iter = c_array->begin(); iter != c_array->end(); ++iter) { + sum_c += std::stoi(std::string( + (*iter).has_value() ? (*iter).value().substr(0, (*iter).value().size() - 20) + : "0")); + } + } + + if (checking_col("d", column_names)) { + auto d_array = + std::dynamic_pointer_cast(batch->GetColumnByName("d")); + for (auto iter = d_array->begin(); iter != d_array->end(); ++iter) { + sum_d += (*iter).has_value() ? (*iter).value() : 0; + } + } + } + ASSERT_EQ(expected_rows, total_rows); + + if (checking_col("a", column_names)) { + ASSERT_EQ(expected_sum * 2, sum_a); + } + if (checking_col("b", column_names)) { + ASSERT_EQ(expected_sum * 3, sum_b); + } + if (checking_col("c", column_names)) { + ASSERT_EQ(expected_sum, sum_c); + } + if (checking_col("d", column_names)) { + ASSERT_EQ(expected_sum, sum_d); + } +} + +class TestRecordBatchReaderWithRanges : public testing::Test { + public: + void SetUp() { + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile()); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + const auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + ASSERT_OK_AND_ASSIGN(arrow_reader, reader_builder.Build()); + } + + void TearDown() {} + + protected: + std::unique_ptr arrow_reader; +}; + +TEST_F(TestRecordBatchReaderWithRanges, TestRangesSplit) {} + +TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { + std::unique_ptr rb_reader; + IntervalRanges rows{{{0, 9}, {40, 49}, {80, 89}, {90, 99}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + // (0+...+9) + (40+...+49) + (80+...+89) + (90+...+99) = 2280 + check_rb(std::move(rb_reader), 40, 2280); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{0, 7}, IntervalRange{16, 23}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + // (0+...+7) + (16+...+23) = 184 + check_rb(std::move(rb_reader), 16, 184); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, + IntervalRange{90, 99}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + // (0+...+99) = 4950 + check_rb(std::move(rb_reader), 100, 4950); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { + std::unique_ptr rb_reader; + IntervalRanges rows{}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = + arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_OK(status); + check_rb(std::move(rb_reader), 0, 0); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { + // case 1: only care about RG 0 + { + std::unique_ptr rb_reader; + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, + &rb_reader)); + + check_rb(std::move(rb_reader), 15, 210); // 0 + 2 + ... + 28 = 210 + } + + // case 2: care about RG 0 and 2 + { + std::unique_ptr rb_reader; + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) { + ranges.push_back({i, i}); + } + } + + for (int64_t i = 60; i < 90; i++) { + if (i % 2 == 0) { + ranges.push_back({i, i}); + } + } + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, + &rb_reader)); + + check_rb(std::move(rb_reader), 30, + 1320); // (0 + 2 + ... + 28) + (60 + 62 ... + 88) = 1320 + } +} + +TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { + std::unique_ptr rb_reader; + { + auto create_ranges = []() -> IntervalRanges { + return IntervalRanges{{IntervalRange{-1, 5}}}; + }; + EXPECT_THROW(create_ranges(), parquet::ParquetException); + } + + { + auto create_ranges = []() -> IntervalRanges { + return IntervalRanges{{{0, 4}, {2, 5}}}; + }; + EXPECT_THROW(create_ranges(), parquet::ParquetException); + } + { + // will treat as {0,99} + IntervalRanges rows{{IntervalRange{0, 100}}}; + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = + arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find("The provided row range [(0, 100)] exceeds the " + "number of rows in the file: 100") != + std::string::npos); + } +} + +TEST(TestRecordBatchReaderWithRangesBadCases, NoPageIndex) { + using parquet::ArrowWriterProperties; + using parquet::WriterProperties; + + // write a file without page index + ASSERT_OK_AND_ASSIGN(std::shared_ptr table, GetTable()); + std::shared_ptr props = + WriterProperties::Builder() + .max_row_group_length(30) + ->disable_write_page_index() // NO INDEX !!!! + ->write_batch_size(13) + ->data_pagesize(1) + ->compression(arrow::Compression::SNAPPY) + ->build(); + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + ASSERT_OK_AND_ASSIGN(auto out_stream, ::arrow::io::BufferOutputStream::Create()); + ASSERT_OK(parquet::arrow::WriteTable(*table.get(), arrow::default_memory_pool(), + out_stream, + /*chunk_size=*/100, props, arrow_props)); + ASSERT_OK_AND_ASSIGN(auto buffer, out_stream->Finish()); + + // try to read the file with Range + arrow::MemoryPool* pool = arrow::default_memory_pool(); + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build()); + + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{0, 29}}}; + std::vector column_indices{0, 1, 2, 3, 4}; + auto status = arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find("Attempting to read with Ranges but Page Index is " + "not found for Row Group: 0") != std::string::npos); +} + +class TestRecordBatchReaderWithRangesWithNulls : public testing::Test { + public: + void SetUp() { + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile(true)); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + const auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + ASSERT_OK_AND_ASSIGN(arrow_reader, reader_builder.Build()); + } + + void TearDown() {} + + protected: + std::unique_ptr arrow_reader; +}; + +TEST_F(TestRecordBatchReaderWithRangesWithNulls, SelectOneRowSkipOneRow) { + { + std::unique_ptr rb_reader; + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) { + ranges.push_back({i, i}); + } + } + + for (int64_t i = 60; i < 90; i++) { + if (i % 2 == 0) { + ranges.push_back({i, i}); + } + } + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, + &rb_reader)); + + // 0-9 is masked as null, so the ramaining is: + // (10 + 12 + ... + 28) + (60 + 62 ... + 88) = 1320 + check_rb(std::move(rb_reader), 30, 1300); + } +} diff --git a/cpp/src/parquet/row_range.cc b/cpp/src/parquet/row_range.cc new file mode 100644 index 000000000000..fa996a198f43 --- /dev/null +++ b/cpp/src/parquet/row_range.cc @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr IntervalRanges::NewIterator() const { + return std::make_unique(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); + } + return ranges_.front().start; +} + +int64_t IntervalRanges::last_row() const { + if (ranges_.empty()) { + throw ParquetException("last_row() called on empty IntervalRanges"); + } + return ranges_.back().end; +} + +bool IntervalRanges::IsOverlapping(const int64_t start, const int64_t end) const { + auto searchRange = IntervalRange{start, end}; + auto it = std::lower_bound(ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { + return IntervalRangeUtils::IsBefore(r1, r2); + }); + return it != ranges_.end() && !IntervalRangeUtils::IsAfter(*it, searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; + for (const IntervalRange& range : ranges_) { + result += IntervalRangeUtils::ToString(range) + ", "; + } + if (!ranges_.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; +} + +std::vector> IntervalRanges::SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const { + if (num_rows_per_sub_ranges.size() <= 1) { + std::unique_ptr single = + std::make_unique(*this); // return a copy of itself + auto ret = std::vector>(); + ret.push_back(std::move(single)); + return ret; + } + + std::vector> result; + + IntervalRanges spaces; + int64_t rows_so_far = 0; + for (size_t i = 0; i < num_rows_per_sub_ranges.size(); ++i) { + auto start = rows_so_far; + rows_so_far += num_rows_per_sub_ranges[i]; + auto end = rows_so_far - 1; + spaces.Add({start, end}); + } + + // each RG's row range forms a space, we need to adjust RowRanges in each space to + // zero based. + for (IntervalRange space : spaces.GetRanges()) { + auto intersection = Intersection(IntervalRanges(space), *this); + + std::unique_ptr zero_based_ranges = + std::make_unique(); + for (const IntervalRange& range : intersection.GetRanges()) { + zero_based_ranges->Add({range.start - space.start, range.end - space.start}); + } + result.push_back(std::move(zero_based_ranges)); + } + + return result; +} + +IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, + const IntervalRanges& right) { + IntervalRanges result; + + size_t rightIndex = 0; + for (const IntervalRange& l : left.ranges_) { + for (size_t i = rightIndex, n = right.ranges_.size(); i < n; ++i) { + const IntervalRange& r = right.ranges_[i]; + if (IntervalRangeUtils::IsBefore(l, r)) { + break; + } else if (IntervalRangeUtils::IsAfter(l, r)) { + rightIndex = i + 1; + continue; + } + result.Add(IntervalRangeUtils::Intersection(l, r)); + } + } + + return result; +} + +void IntervalRanges::Add(const IntervalRange& range) { + const IntervalRange rangeToAdd = range; + if (ranges_.size() > 1 && rangeToAdd.start <= ranges_.back().end) { + throw ParquetException("Ranges must be added in order"); + } + ranges_.push_back(rangeToAdd); +} + +const std::vector& IntervalRanges::GetRanges() const { return ranges_; } + +IntervalRowRangesIterator::IntervalRowRangesIterator( + const std::vector& ranges) + : ranges_(ranges) {} + +IntervalRowRangesIterator::~IntervalRowRangesIterator() {} + +std::variant IntervalRowRangesIterator::NextRange() { + if (current_index_ >= ranges_.size()) return End(); + + return ranges_[current_index_++]; +} +} // namespace parquet diff --git a/cpp/src/parquet/row_range.h b/cpp/src/parquet/row_range.h new file mode 100644 index 000000000000..4e7c2631eb6a --- /dev/null +++ b/cpp/src/parquet/row_range.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + virtual ~RowRanges() = default; + /// \brief Total number of rows in the row ranges. + virtual size_t num_rows() const = 0; + /// \brief First row in the ranges + virtual int64_t first_row() const = 0; + /// \brief Last row in the ranges + virtual int64_t last_row() const = 0; + /// \brief Whether the given range from start to end overlaps with the row ranges. + virtual bool IsOverlapping(int64_t start, int64_t end) const = 0; + /// \brief Split the row ranges into sub row ranges according to the + /// specified number of rows per sub row ranges. A typical use case is + /// to convert file based RowRanges to row group based RowRanges. + /// + /// \param num_rows_per_sub_ranges number of rows per sub row range. + virtual std::vector> SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const = 0; + /// \brief Readable string representation + virtual std::string ToString() const = 0; + + class Iterator { + public: + virtual std::variant NextRange() = 0; + virtual ~Iterator() = default; + }; + /// \brief Create an iterator to iterate over the ranges + virtual std::unique_ptr NewIterator() const = 0; +}; + +class IntervalRanges : public RowRanges { + public: + IntervalRanges(); + explicit IntervalRanges(const IntervalRange& range); + explicit IntervalRanges(const std::vector& ranges); + std::unique_ptr NewIterator() const override; + size_t num_rows() const override; + int64_t first_row() const override; + int64_t last_row() const override; + bool IsOverlapping(int64_t start, int64_t end) const override; + std::string ToString() const override; + std::vector> SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const override; + static IntervalRanges Intersection(const IntervalRanges& left, + const IntervalRanges& right); + void Add(const IntervalRange& range); + const std::vector& GetRanges() const; + + private: + std::vector ranges_; +}; + +class IntervalRowRangesIterator : public RowRanges::Iterator { + public: + explicit IntervalRowRangesIterator(const std::vector& ranges); + ~IntervalRowRangesIterator() override; + std::variant NextRange() override; + + private: + const std::vector& ranges_; + size_t current_index_ = 0; +}; +} // namespace parquet diff --git a/cpp/src/parquet/row_range_test.cc b/cpp/src/parquet/row_range_test.cc new file mode 100644 index 000000000000..bf0563211b8e --- /dev/null +++ b/cpp/src/parquet/row_range_test.cc @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include +#include "parquet/column_reader.h" + +using parquet::IntervalRange; +using parquet::IntervalRanges; + +class RowRangesTest : public ::testing::Test { + protected: + IntervalRanges row_ranges; +}; + +TEST_F(RowRangesTest, EmptyRG_ReturnsOriginalRowRanges) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 1); + + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + ASSERT_EQ(iter->NextRange().index(), 2); +} + +TEST_F(RowRangesTest, SingleRG_ReturnsOriginalRowRanges2) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg = {11}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 1); + + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + ASSERT_EQ(iter->NextRange().index(), 2); +} + +TEST_F(RowRangesTest, ReturnsTwoRowRanges) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg = {5, 6}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 5); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, ReturnsMultipleRowRanges) { + row_ranges.Add(IntervalRange(0, 11)); + std::vector rows_per_rg = {3, 4, 100}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 3); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 2); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 3); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[2]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, MultipleInputRange) { + row_ranges.Add(IntervalRange(0, 10)); + row_ranges.Add(IntervalRange(90, 111)); + row_ranges.Add(IntervalRange(191, 210)); + + std::vector rows_per_rg = {100, 100}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + + range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 90); + ASSERT_EQ(range.end, 99); + + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 11); + + range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 91); + ASSERT_EQ(range.end, 99); + + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, MultipleSplitPoints_ReturnWithEmptyRowRanges) { + row_ranges.Add(IntervalRange(11, 18)); + std::vector rows_per_rg = {5, 5, 5, 5, 5}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 5); + { + auto iter = result[0]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[2]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 1); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[3]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 3); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[4]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, RangeExceedRG) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg = {5, 3}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 2); + ASSERT_EQ(iter->NextRange().index(), 2); + } +}