From 2e898ff8a7144485a822ac98c6a467efc4901947 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 3 Sep 2019 20:29:13 +0800 Subject: [PATCH 1/5] test done for primitive array reader --- rust/arrow/src/bitmap.rs | 4 + rust/parquet/src/arrow/array_reader.rs | 1042 +++++++++++++++++ rust/parquet/src/arrow/mod.rs | 1 + rust/parquet/src/arrow/record_reader.rs | 127 +- rust/parquet/src/column/reader.rs | 92 +- rust/parquet/src/schema/types.rs | 22 + rust/parquet/src/util/test_common/mod.rs | 2 + .../parquet/src/util/test_common/page_util.rs | 141 ++- 8 files changed, 1324 insertions(+), 107 deletions(-) create mode 100644 rust/parquet/src/arrow/array_reader.rs diff --git a/rust/arrow/src/bitmap.rs b/rust/arrow/src/bitmap.rs index cd05b5955316..f82f773e6ba7 100644 --- a/rust/arrow/src/bitmap.rs +++ b/rust/arrow/src/bitmap.rs @@ -55,6 +55,10 @@ impl Bitmap { assert!(i < (self.bits.len() << 3)); unsafe { bit_util::get_bit_raw(self.bits.raw_data(), i) } } + + pub fn to_buffer(self) -> Buffer { + self.bits + } } impl<'a, 'b> BitAnd<&'b Bitmap> for &'a Bitmap { diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs new file mode 100644 index 000000000000..3852614fbb70 --- /dev/null +++ b/rust/parquet/src/arrow/array_reader.rs @@ -0,0 +1,1042 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; +use std::collections::{HashMap, HashSet}; +use std::marker::PhantomData; +use std::mem::size_of; +use std::mem::transmute; +use std::rc::Rc; +use std::result::Result::Ok; +use std::slice::from_raw_parts; +use std::slice::from_raw_parts_mut; +use std::sync::Arc; +use std::vec::Vec; + +use arrow::array::ArrayDataRef; +use arrow::array::ArrayRef; +use arrow::array::BufferBuilderTrait; +use arrow::array::StructArray; +use arrow::array::{Array, ListArray}; +use arrow::array::{ArrayDataBuilder, Int32BufferBuilder}; +use arrow::array::{BooleanBufferBuilder, Int16BufferBuilder}; +use arrow::bitmap::Bitmap; +use arrow::buffer::Buffer; +use arrow::buffer::MutableBuffer; +use arrow::datatypes::DataType::List; +use arrow::datatypes::{DataType as ArrowType, Field}; + +use crate::arrow::converter::BooleanConverter; +use crate::arrow::converter::Converter; +use crate::arrow::converter::Float32Converter; +use crate::arrow::converter::Float64Converter; +use crate::arrow::converter::Int16Converter; +use crate::arrow::converter::Int32Converter; +use crate::arrow::converter::Int64Converter; +use crate::arrow::converter::Int8Converter; +use crate::arrow::converter::UInt16Converter; +use crate::arrow::converter::UInt32Converter; +use crate::arrow::converter::UInt64Converter; +use crate::arrow::converter::UInt8Converter; +use crate::arrow::record_reader::RecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{Repetition, Type as PhysicalType}; +use crate::column::page::PageIterator; +use crate::data_type::DataType; +use crate::data_type::DoubleType; +use crate::data_type::FloatType; +use crate::data_type::Int32Type; +use crate::data_type::Int64Type; +use crate::data_type::{BoolType, ByteArrayType, Int96Type}; +use crate::errors::ParquetError; +use crate::errors::ParquetError::ArrowError; +use crate::errors::Result; +use crate::file::reader::{FilePageIterator, FileReader}; +use crate::schema::types::{ + ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, +}; +use crate::schema::visitor::TypeVisitor; + +/// Array reader reads parquet data into arrow array. +pub trait ArrayReader { + /// Returns the arrow type of this array reader. + fn get_data_type(&self) -> &ArrowType; + + /// Reads at most `batch_size` records into an arrow array and return it. + fn next_batch(&mut self, batch_size: usize) -> Result; + + /// Returns the definition levels of data from last call of `next_batch`. + /// The result is used by parent array reader to calculate its own definition + /// levels and repetition levels, so that its parent can calculate null bitmap. + fn get_def_levels(&self) -> Option<&[i16]>; + + /// Return the repetition levels of data from last call of `next_batch`. + /// The result is used by parent array reader to calculate its own definition + /// levels and repetition levels, so that its parent can calculate null bitmap. + fn get_rep_levels(&self) -> Option<&[i16]>; +} + +/// Primitive array readers are leaves of array reader tree. They accept page iterator +/// and read them into primitive arrays. +pub struct PrimitiveArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + column_desc: ColumnDescPtr, + record_reader: RecordReader, + _type_marker: PhantomData, +} + +impl PrimitiveArrayReader { + /// Construct primitive array reader. + pub fn new(mut pages: Box, column_desc: ColumnDescPtr) -> Result { + let data_type = parquet_to_arrow_field(column_desc.clone())? + .data_type() + .clone(); + + let mut record_reader = RecordReader::::new(column_desc.clone()); + record_reader.set_page_reader(pages.next().ok_or_else(|| { + general_err!( + "Can't \ + build array without pages!" + ) + })??)?; + + Ok(Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + record_reader, + _type_marker: PhantomData, + }) + } +} + +/// Implementation of primitive array reader. +impl ArrayReader for PrimitiveArrayReader { + /// Returns data type of primitive array. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + /// Reads at most `batch_size` records into array. + fn next_batch(&mut self, batch_size: usize) -> Result> { + let mut records_read = 0usize; + while records_read < batch_size { + let records_to_read = batch_size - records_read; + + let records_read_once = self.record_reader.read_records(records_to_read)?; + records_read = records_read + records_read_once; + + // Record reader exhausted + if records_read_once < records_to_read { + if let Some(page_reader) = self.pages.next() { + // Read from new page reader + self.record_reader.set_page_reader(page_reader?)?; + } else { + // Page reader also exhausted + break; + } + } + } + + // convert to arrays + let array = match (&self.data_type, T::get_physical_type()) { + (ArrowType::Boolean, PhysicalType::BOOLEAN) => unsafe { + BooleanConverter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int8, PhysicalType::INT32) => unsafe { + Int8Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int16, PhysicalType::INT32) => unsafe { + Int16Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int32, PhysicalType::INT32) => unsafe { + Int32Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt8, PhysicalType::INT32) => unsafe { + UInt8Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt16, PhysicalType::INT32) => unsafe { + UInt16Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt32, PhysicalType::INT32) => unsafe { + UInt32Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int64, PhysicalType::INT64) => unsafe { + Int64Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt64, PhysicalType::INT64) => unsafe { + UInt64Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Float32, PhysicalType::FLOAT) => unsafe { + Float32Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Float64, PhysicalType::DOUBLE) => unsafe { + Float64Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (arrow_type, _) => Err(general_err!( + "Reading {:?} type from parquet is not supported yet.", + arrow_type + )), + }?; + + // save definition and repetition buffers + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); + self.record_reader.reset(); + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } +} + +struct StructArrayReader { + children: Vec>, + data_type: ArrowType, + struct_def_level: i16, + struct_rep_level: i16, + def_level_buffer: Option, + rep_level_buffer: Option, +} + +impl StructArrayReader { + pub fn new( + data_type: ArrowType, + children: Vec>, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + data_type, + children, + struct_def_level: def_level, + struct_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +impl ArrayReader for StructArrayReader { + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result> { + if self.children.len() == 0 { + self.def_level_buffer = None; + return Ok(Arc::new(StructArray::from(Vec::new()))); + } + + let children_array = self + .children + .iter_mut() + .map(|reader| reader.next_batch(batch_size)) + .try_fold( + Vec::new(), + |mut result, child_array| -> Result>> { + result.push(child_array?); + Ok(result) + }, + )?; + + // check that array child data has same size + let children_array_len = children_array.first().unwrap().len(); + + let all_children_len_eq = children_array + .iter() + .all(|arr| arr.len() == children_array_len); + if !all_children_len_eq { + return Err(general_err!("Not all children array length are the same!")); + } + + // let data_type = children_array.iter() + // .map(|arr| arr.data_ref().data_type().clone()) + // .collect::>(); + // + // let data_type = ArrowType::Struct(data_type); + // calculate struct def level data + let buffer_size = children_array_len * size_of::(); + let mut def_level_data_buffer = MutableBuffer::new(buffer_size); + def_level_data_buffer.resize(buffer_size)?; + + let def_level_data = unsafe { + let ptr = transmute::<*const u8, *mut i16>(def_level_data_buffer.raw_data()); + from_raw_parts_mut(ptr, children_array_len) + }; + def_level_data + .iter_mut() + .for_each(|v| *v = self.struct_def_level); + + self.children.iter().try_for_each(|child| { + if let Some(current_child_def_levels) = child.get_def_levels() { + if current_child_def_levels.len() != children_array_len { + Err(general_err!("Child array length are not equal!")) + } else { + for i in 0..children_array_len { + def_level_data[i] = + min(def_level_data[i], current_child_def_levels[i]); + } + Ok(()) + } + } else { + Ok(()) + } + })?; + + // calculate bitmap for current array + let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); + let mut null_count = 0; + def_level_data.iter().try_for_each(|item| { + let is_null = *item < self.struct_def_level; + if is_null { + null_count += 1; + } + bitmap_builder.append(is_null) + })?; + + let array_data = ArrayDataBuilder::new(self.data_type.clone()) + .len(children_array_len) + .null_count(null_count) + .null_bit_buffer(bitmap_builder.finish()) + .child_data( + children_array + .iter() + .map(|x| x.data()) + .collect::>(), + ) + .build(); + + // calculate struct rep level data, since struct doesn't add to repetition + // levels, here we just need to keep repetition levels of first array + // TODO: Verify that all children array reader has same repetition levels + let rep_level_data = self + .children + .first() + .unwrap() + .get_rep_levels() + .map(|data| -> Result { + let mut buffer = Int16BufferBuilder::new(children_array_len); + buffer.append_slice(data)?; + Ok(buffer.finish()) + }) + .transpose()?; + + self.def_level_buffer = Some(def_level_data_buffer.freeze()); + self.rep_level_buffer = rep_level_data; + Ok(Arc::new(StructArray::from(array_data))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_level_buffer.as_ref().map(|buf| buf.typed_data()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_level_buffer.as_ref().map(|buf| buf.typed_data()) + } +} + +struct ListArrayReader { + data_type: ArrowType, + child: Box, + rep_level: i16, + def_level: i16, + def_level_buffer: Option, + rep_level_buffer: Option, +} + +impl ListArrayReader { + pub fn new(child: Box, rep_level: i16, def_level: i16) -> Self { + Self { + data_type: List(Box::new(child.get_data_type().clone())), + child, + rep_level, + def_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +impl ArrayReader for ListArrayReader { + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result> { + let child_array = self.child.next_batch(batch_size)?; + + let mut offset_builder = Int32BufferBuilder::new(child_array.len()); + let mut null_bitmap_builder = BooleanBufferBuilder::new(child_array.len()); + let mut rep_level_builder = Int16BufferBuilder::new(child_array.len()); + let mut def_level_builder = Int16BufferBuilder::new(child_array.len()); + let mut len = 0; + + let arrow_type = ArrowType::List(Box::new(child_array.data_type().clone())); + + if child_array.len() > 0 { + // Since this is a list, we assume that child array should have repetition + // levels and definition levels + let child_rep_levels = self.child.get_rep_levels().ok_or_else(|| { + general_err!( + "Repetition levels should exist for list \ + array!" + ) + })?; + let child_def_levels = self.child.get_def_levels().ok_or_else(|| { + general_err!( + "Definition levels should exist for list \ + array!" + ) + })?; + + let mut cur_rep_level = child_rep_levels[0]; + let mut cur_def_level = child_def_levels[0]; + offset_builder.append(0)?; + len = 1; + + for idx in 1..child_array.len() { + if child_rep_levels[idx] < self.rep_level { + len = len + 1; + rep_level_builder.append(cur_rep_level)?; + def_level_builder.append(cur_def_level)?; + + let current_not_null = cur_def_level >= self.def_level; + null_bitmap_builder.append(current_not_null)?; + + offset_builder.append(idx as i32)?; + cur_rep_level = child_rep_levels[idx]; + cur_def_level = child_def_levels[idx]; + } else { + cur_rep_level = min(cur_rep_level, child_rep_levels[idx]); + cur_def_level = min(cur_def_level, child_def_levels[idx]); + } + } + + null_bitmap_builder.append(cur_def_level >= self.def_level)?; + rep_level_builder.append(cur_rep_level)?; + def_level_builder.append(cur_def_level)?; + offset_builder.append(child_array.len() as i32)?; + + // dbg!(&self.data_type); + // dbg!(child_def_levels); + } + + self.rep_level_buffer = Some(rep_level_builder.finish()); + self.def_level_buffer = Some(def_level_builder.finish()); + + let offset_buffer = offset_builder.finish(); + let null_bitmap = null_bitmap_builder.finish(); + + let array_data = ArrayDataBuilder::new(arrow_type) + .len(len) + .add_buffer(offset_buffer) + .null_bit_buffer(null_bitmap) + .add_child_data(child_array.data()) + .build(); + + Ok(Arc::new(ListArray::from(array_data))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_level_buffer.as_ref().map(|buf| { + let len = buf.len() * size_of::() / size_of::(); + unsafe { + from_raw_parts(transmute::<*const u8, *const i16>(buf.raw_data()), len) + } + }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_level_buffer.as_ref().map(|buf| { + let len = buf.len() * size_of::() / size_of::(); + unsafe { + from_raw_parts(transmute::<*const u8, *const i16>(buf.raw_data()), len) + } + }) + } +} + +pub fn build_array_reader( + parquet_schema: SchemaDescPtr, + column_indices: T, + file_reader: Rc, +) -> Result> +where + T: IntoIterator, +{ + let mut base_nodes = Vec::new(); + let mut base_nodes_set = HashSet::new(); + let mut leaves = HashMap::<*const Type, usize>::new(); + + for c in column_indices { + let column = parquet_schema.column(c).self_type() as *const Type; + let root = parquet_schema.get_column_root_ptr(c); + let root_raw_ptr = root.clone().as_ref() as *const Type; + + leaves.insert(column, c); + if !base_nodes_set.contains(&root_raw_ptr) { + base_nodes.push(root); + base_nodes_set.insert(root_raw_ptr); + } + } + + if leaves.is_empty() { + return Err(general_err!("Can't build array reader without columns!")); + } + + ArrayReaderBuilder::new( + Rc::new(parquet_schema.root_schema().clone()), + Rc::new(leaves), + file_reader, + ) + .build_array_reader() +} + +struct ArrayReaderBuilder { + root_schema: TypePtr, + // Key: columns that need to be included in final array builder + // Value: column index in schema + columns_included: Rc>, + file_reader: Rc, +} + +#[derive(Clone)] +struct ArrayReaderBuilderContext { + def_level: i16, + rep_level: i16, + path: ColumnPath, +} + +impl Default for ArrayReaderBuilderContext { + fn default() -> Self { + Self { + def_level: 0i16, + rep_level: 0i16, + path: ColumnPath::new(Vec::new()), + } + } +} + +impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> + for ArrayReaderBuilder +{ + /// Build array reader for primitive type. + fn visit_primitive( + &mut self, + cur_type: TypePtr, + context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + if self.is_included(cur_type.as_ref()) { + let mut new_context = context.clone(); + new_context.path.append(vec![cur_type.name().to_string()]); + + match cur_type.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + + let reader = + self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?; + + if cur_type.get_basic_info().repetition() == Repetition::REPEATED { + Ok(Some(Box::new(ListArrayReader::new( + reader, + new_context.rep_level, + new_context.def_level, + )))) + } else { + Ok(Some(reader)) + } + } else { + Ok(None) + } + } + + fn visit_struct( + &mut self, + cur_type: Rc, + context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + let mut new_context = context.clone(); + new_context.path.append(vec![cur_type.name().to_string()]); + + if cur_type.get_basic_info().has_repetition() { + match cur_type.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + } + + if let Some(reader) = + self.build_for_struct_type_inner(cur_type.clone(), &new_context)? + { + if cur_type.get_basic_info().has_repetition() + && cur_type.get_basic_info().repetition() == Repetition::REPEATED + { + Ok(Some(Box::new(ListArrayReader::new( + reader, + new_context.rep_level, + new_context.def_level, + )))) + } else { + Ok(Some(reader)) + } + } else { + Ok(None) + } + } + + fn visit_map( + &mut self, + _cur_type: Rc, + _context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + Err(ArrowError(format!( + "Reading parquet map array into arrow is not supported yet!" + ))) + } + + fn visit_list_with_item( + &mut self, + list_type: Rc, + item_type: &Type, + context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + let mut new_context = context.clone(); + new_context.rep_level += 1; + new_context.def_level += 1; + new_context.path.append(vec![list_type.name().to_string()]); + + self.dispatch(Rc::new(item_type.clone()), &new_context) + .map(|child_opt| { + child_opt.map(|child| -> Box { + Box::new(ListArrayReader::new( + child, + new_context.rep_level, + new_context.def_level, + )) + }) + }) + } +} + +impl<'a> ArrayReaderBuilder { + fn new( + root_schema: TypePtr, + columns_included: Rc>, + file_reader: Rc, + ) -> Self { + Self { + root_schema, + columns_included, + file_reader, + } + } + + fn build_array_reader(&mut self) -> Result> { + let context = ArrayReaderBuilderContext::default(); + + self.visit_struct(self.root_schema.clone(), &context) + .map(|reader| reader.unwrap()) + } + + // Utility functions + fn is_included(&self, t: &Type) -> bool { + self.columns_included.contains_key(&(t as *const Type)) + } + + // Functions for primitive types. + fn build_for_primitive_type_inner( + &self, + cur_type: TypePtr, + context: &'a ArrayReaderBuilderContext, + ) -> Result> { + let column_desc = Rc::new(ColumnDescriptor::new( + cur_type.clone(), + Some(self.root_schema.clone()), + context.def_level, + context.rep_level, + context.path.clone(), + )); + let page_iterator = Box::new(FilePageIterator::new( + self.columns_included[&(cur_type.as_ref() as *const Type)], + self.file_reader.clone(), + )?); + + match cur_type.get_physical_type() { + PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::INT32 => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::INT96 => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::DOUBLE => Ok(Box::new( + PrimitiveArrayReader::::new(page_iterator, column_desc)?, + )), + PhysicalType::BYTE_ARRAY => Ok(Box::new(PrimitiveArrayReader::< + ByteArrayType, + >::new( + page_iterator, column_desc + )?)), + other => Err(ArrowError(format!( + "Unable to create primite array reader for parquet physical type {}", + other + ))), + } + } + + fn build_for_struct_type_inner( + &mut self, + cur_type: TypePtr, + context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + let mut fields = Vec::with_capacity(cur_type.get_fields().len()); + let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); + + for child in cur_type.get_fields() { + if let Some(child_reader) = self.dispatch(child.clone(), context)? { + fields.push(Field::new( + child.name(), + child_reader.get_data_type().clone(), + child.is_optional(), + )); + children_reader.push(child_reader); + } + } + + if !fields.is_empty() { + let arrow_type = ArrowType::Struct(fields); + Ok(Some(Box::new(StructArrayReader::new( + arrow_type, + children_reader, + context.def_level, + context.rep_level, + )))) + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + use crate::arrow::array_reader::{ArrayReader, PrimitiveArrayReader}; + use crate::basic::Encoding; + use crate::column::page::Page; + use crate::data_type::{DataType, Int32Type}; + use crate::schema::parser::parse_message_type; + use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; + use crate::util::test_common::make_pages; + use crate::util::test_common::page_util::{ + DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, + }; + use arrow::array::PrimitiveArray; + use arrow::compute::max; + use arrow::datatypes::Int32Type as ArrowInt32; + use rand::distributions::range::SampleRange; + use std::collections::VecDeque; + use std::rc::Rc; + + fn make_column_chuncks( + column_desc: ColumnDescPtr, + encoding: Encoding, + num_levels: usize, + min_value: T::T, + max_value: T::T, + def_levels: &mut Vec, + rep_levels: &mut Vec, + values: &mut Vec, + page_lists: &mut Vec>, + use_v2: bool, + num_chuncks: usize, + ) where + T::T: PartialOrd + SampleRange + Copy, + { + for _i in 0..num_chuncks { + let mut pages = VecDeque::new(); + let mut data = Vec::new(); + let mut page_def_levels = Vec::new(); + let mut page_rep_levels = Vec::new(); + + make_pages::( + column_desc.clone(), + encoding, + 1, + num_levels, + min_value, + max_value, + &mut page_def_levels, + &mut page_rep_levels, + &mut data, + &mut pages, + use_v2, + ); + + def_levels.append(&mut page_def_levels); + rep_levels.append(&mut page_rep_levels); + values.append(&mut data); + page_lists.push(Vec::from(pages)); + } + } + + #[test] + fn test_primitive_array_reader_data() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chuncks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = InMemoryPageIterator::new( + schema.clone(), + column_desc.clone(), + page_lists, + ); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc.clone(), + ) + .unwrap(); + + // Read first 50 values, which are all from the first column chunck + let array = array_reader.next_batch(50).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from( + data[0..50].iter().cloned().collect::>() + ), + array + ); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from( + data[50..150].iter().cloned().collect::>() + ), + array + ); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from( + data[150..200].iter().cloned().collect::>() + ), + array + ); + } + } + + #[test] + fn test_primitive_array_reader_def_and_rep_levels() { + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL INT32 leaf; + } + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut def_levels = Vec::new(); + let mut rep_levels = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chuncks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut def_levels, + &mut rep_levels, + &mut Vec::new(), + &mut page_lists, + true, + 2, + ); + + let page_iterator = InMemoryPageIterator::new( + schema.clone(), + column_desc.clone(), + page_lists, + ); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc.clone(), + ) + .unwrap(); + + let mut accu_len: usize = 0; + + // Read first 50 values, which are all from the first column chunck + let array = array_reader.next_batch(50).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } + } +} diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index af1d00c91b24..a2c6031cfb8d 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -20,6 +20,7 @@ //! //! This mod provides API for converting between arrow and parquet. +pub(in crate::arrow) mod array_reader; pub(in crate::arrow) mod converter; pub(in crate::arrow) mod record_reader; pub mod schema; diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 803f4a0d2f57..21632cff6389 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -16,9 +16,9 @@ // under the License. use std::cmp::{max, min}; -use std::mem::replace; use std::mem::size_of; use std::mem::transmute; +use std::mem::{replace, swap}; use std::slice; use crate::column::{page::PageReader, reader::ColumnReaderImpl}; @@ -187,40 +187,141 @@ impl RecordReader { } /// Returns definition level data. + /// The implementation has side effects. It will create a new buffer to hold those + /// definition level values that have already been read into memory but not counted + /// as record values, e.g. those from `self.num_values` to `self.values_written`. pub fn consume_def_levels(&mut self) -> Option { - let empty_def_buffer = if self.column_desc.max_def_level() > 0 { - Some(MutableBuffer::new(MIN_BATCH_SIZE)) + let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { + let num_left_values = self.values_written - self.num_values; + let mut new_buffer = MutableBuffer::new( + size_of::() * max(MIN_BATCH_SIZE, num_left_values), + ); + new_buffer + .resize(num_left_values * size_of::()) + .unwrap(); + + let new_def_levels = FatPtr::::with_offset(&new_buffer, 0); + let new_def_levels = new_def_levels.to_slice_mut(); + let left_def_levels = + FatPtr::::with_offset(&def_levels_buf, self.num_values); + let left_def_levels = left_def_levels.to_slice(); + + new_def_levels[0..num_left_values] + .copy_from_slice(&left_def_levels[0..num_left_values]); + + def_levels_buf + .resize(self.num_values * size_of::()) + .unwrap(); + Some(new_buffer) } else { None }; - replace(&mut self.def_levels, empty_def_buffer).map(|x| x.freeze()) + replace(&mut self.def_levels, new_buffer).map(|x| x.freeze()) } - /// Return repetition level data + /// Return repetition level data. + /// The side effect is similar to `consume_def_levels`. pub fn consume_rep_levels(&mut self) -> Option { - let empty_def_buffer = if self.column_desc.max_rep_level() > 0 { - Some(MutableBuffer::new(MIN_BATCH_SIZE)) + // TODO: Optimize to reduce the copy + let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { + let num_left_values = self.values_written - self.num_values; + let mut new_buffer = MutableBuffer::new( + size_of::() * max(MIN_BATCH_SIZE, num_left_values), + ); + new_buffer + .resize(num_left_values * size_of::()) + .unwrap(); + + let new_rep_levels = FatPtr::::with_offset(&new_buffer, 0); + let new_rep_levels = new_rep_levels.to_slice_mut(); + let left_rep_levels = + FatPtr::::with_offset(&rep_levels_buf, self.num_values); + let left_rep_levels = left_rep_levels.to_slice(); + + new_rep_levels[0..num_left_values] + .copy_from_slice(&left_rep_levels[0..num_left_values]); + + rep_levels_buf + .resize(self.num_values * size_of::()) + .unwrap(); + Some(new_buffer) } else { None }; - replace(&mut self.rep_levels, empty_def_buffer).map(|x| x.freeze()) + replace(&mut self.rep_levels, new_buffer).map(|x| x.freeze()) } /// Returns currently stored buffer data. + /// The side effect is similar to `consume_def_levels`. pub fn consume_record_data(&mut self) -> Buffer { - replace(&mut self.records, MutableBuffer::new(MIN_BATCH_SIZE)).freeze() + // TODO: Optimize to reduce the copy + let num_left_values = self.values_written - self.num_values; + let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); + new_buffer + .resize(num_left_values * T::get_type_size()) + .unwrap(); + + let new_records = + FatPtr::::with_offset_and_size(&new_buffer, 0, T::get_type_size()); + let new_records = new_records.to_slice_mut(); + let left_records = FatPtr::::with_offset_and_size( + &self.records, + self.num_values, + T::get_type_size(), + ); + let left_records = left_records.to_slice_mut(); + + for idx in 0..num_left_values { + swap(&mut new_records[idx], &mut left_records[idx]); + } + + self.records + .resize(self.num_values * T::get_type_size()) + .unwrap(); + replace(&mut self.records, new_buffer).freeze() } + /// Returns currently stored null bitmap data. + /// The side effect is similar to `consume_def_levels`. pub fn consume_bitmap_buffer(&mut self) -> Option { - let bitmap_builder = if self.column_desc.max_def_level() > 0 { - Some(BooleanBufferBuilder::new(MIN_BATCH_SIZE)) + // TODO: Optimize to reduce the copy + if self.column_desc.max_def_level() > 0 { + let num_left_values = self.values_written - self.num_values; + let new_bitmap_builder = Some(BooleanBufferBuilder::new(max( + MIN_BATCH_SIZE, + num_left_values, + ))); + let old_bitmap = replace(&mut self.null_bitmap, new_bitmap_builder) + .map(|mut builder| builder.finish()) + .unwrap(); + + let old_bitmap = Bitmap::from(old_bitmap); + + for i in self.num_values..self.values_written { + self.null_bitmap + .as_mut() + .unwrap() + .append(old_bitmap.is_set(i)) + .unwrap(); + } + + Some(old_bitmap.to_buffer()) } else { None - }; + } + } - replace(&mut self.null_bitmap, bitmap_builder).map(|mut builder| builder.finish()) + /// Reset state of record reader. + /// Should be called after consuming data, e.g. `consume_rep_levels`, + /// `consume_rep_levels`, `consume_record_data` and `consume_bitmap_buffer`. + pub fn reset(&mut self) { + self.values_written = self.values_written - self.num_values; + self.num_records = 0; + self.num_values = 0; + self.values_seen = 0; + self.in_middle_of_record = false; } /// Returns bitmap data. diff --git a/rust/parquet/src/column/reader.rs b/rust/parquet/src/column/reader.rs index 8f7c7a3061aa..16af10113980 100644 --- a/rust/parquet/src/column/reader.rs +++ b/rust/parquet/src/column/reader.rs @@ -522,7 +522,7 @@ mod tests { use crate::util::{ memory::MemTracker, test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}, - test_common::random_numbers_range, + test_common::{make_pages, random_numbers_range}, }; const NUM_LEVELS: usize = 128; @@ -1383,94 +1383,4 @@ mod tests { Ok(self.pages.next()) } } - - fn make_pages( - desc: ColumnDescPtr, - encoding: Encoding, - num_pages: usize, - levels_per_page: usize, - min: T::T, - max: T::T, - def_levels: &mut Vec, - rep_levels: &mut Vec, - values: &mut Vec, - pages: &mut VecDeque, - use_v2: bool, - ) where - T::T: PartialOrd + SampleRange + Copy, - { - let mut num_values = 0; - let max_def_level = desc.max_def_level(); - let max_rep_level = desc.max_rep_level(); - - let mem_tracker = Rc::new(MemTracker::new()); - let mut dict_encoder = DictEncoder::::new(desc.clone(), mem_tracker); - - for i in 0..num_pages { - let mut num_values_cur_page = 0; - let level_range = i * levels_per_page..(i + 1) * levels_per_page; - - if max_def_level > 0 { - random_numbers_range(levels_per_page, 0, max_def_level + 1, def_levels); - for dl in &def_levels[level_range.clone()] { - if *dl == max_def_level { - num_values_cur_page += 1; - } - } - } else { - num_values_cur_page = levels_per_page; - } - if max_rep_level > 0 { - random_numbers_range(levels_per_page, 0, max_rep_level + 1, rep_levels); - } - random_numbers_range(num_values_cur_page, min, max, values); - - // Generate the current page - - let mut pb = DataPageBuilderImpl::new( - desc.clone(), - num_values_cur_page as u32, - use_v2, - ); - if max_rep_level > 0 { - pb.add_rep_levels(max_rep_level, &rep_levels[level_range.clone()]); - } - if max_def_level > 0 { - pb.add_def_levels(max_def_level, &def_levels[level_range]); - } - - let value_range = num_values..num_values + num_values_cur_page; - match encoding { - Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => { - let _ = dict_encoder.put(&values[value_range.clone()]); - let indices = dict_encoder - .write_indices() - .expect("write_indices() should be OK"); - pb.add_indices(indices); - } - Encoding::PLAIN => { - pb.add_values::(encoding, &values[value_range]); - } - enc @ _ => panic!("Unexpected encoding {}", enc), - } - - let data_page = pb.consume(); - pages.push_back(data_page); - num_values += num_values_cur_page; - } - - if encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY - { - let dict = dict_encoder - .write_dict() - .expect("write_dict() should be OK"); - let dict_page = Page::DictionaryPage { - buf: dict, - num_values: dict_encoder.num_entries() as u32, - encoding: Encoding::RLE_DICTIONARY, - is_sorted: false, - }; - pages.push_front(dict_page); - } - } } diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs index 08dd2e14e6d8..4d4d54969d7a 100644 --- a/rust/parquet/src/schema/types.rs +++ b/rust/parquet/src/schema/types.rs @@ -164,6 +164,13 @@ impl Type { _ => false, } } + + /// Returns `true` if this type is repeated or optional. + /// If this type doesn't have repetition defined, we still treat it as optional. + pub fn is_optional(&self) -> bool { + self.get_basic_info().has_repetition() + && self.get_basic_info().repetition() != Repetition::REQUIRED + } } /// A builder for primitive types. All attributes are optional @@ -527,6 +534,21 @@ impl ColumnPath { pub fn string(&self) -> String { self.parts.join(".") } + + /// Appends more components to end of column path. + /// ```rust + /// use parquet::schema::types::ColumnPath; + /// + /// let mut path = ColumnPath::new(vec!["a".to_string(), "b".to_string(), "c" + /// .to_string()]); + /// assert_eq!(&path.string(), "a.b.c"); + /// + /// path.append(vec!["d".to_string(), "e".to_string()]); + /// assert_eq!(&path.string(), "a.b.c.d.e"); + /// ``` + pub fn append(&mut self, mut tail: Vec) -> () { + self.parts.append(&mut tail); + } } impl fmt::Display for ColumnPath { diff --git a/rust/parquet/src/util/test_common/mod.rs b/rust/parquet/src/util/test_common/mod.rs index c24afdf40abd..79a970e0a820 100644 --- a/rust/parquet/src/util/test_common/mod.rs +++ b/rust/parquet/src/util/test_common/mod.rs @@ -28,3 +28,5 @@ pub use self::rand_gen::RandGen; pub use self::file_util::get_temp_file; pub use self::file_util::get_test_file; pub use self::file_util::get_test_path; + +pub use self::page_util::make_pages; diff --git a/rust/parquet/src/util/test_common/page_util.rs b/rust/parquet/src/util/test_common/page_util.rs index d12b734f2d50..f8316d6f2c41 100644 --- a/rust/parquet/src/util/test_common/page_util.rs +++ b/rust/parquet/src/util/test_common/page_util.rs @@ -16,19 +16,23 @@ // under the License. use crate::basic::Encoding; -use crate::column::page::Page; use crate::column::page::PageReader; +use crate::column::page::{Page, PageIterator}; use crate::data_type::DataType; -use crate::encodings::encoding::{get_encoder, Encoder}; +use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::encodings::levels::max_buffer_size; use crate::encodings::levels::LevelEncoder; use crate::errors::Result; -use crate::schema::types::ColumnDescPtr; +use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; use crate::util::memory::ByteBufferPtr; use crate::util::memory::MemTracker; use crate::util::memory::MemTrackerPtr; +use crate::util::test_common::random_numbers_range; +use rand::distributions::range::SampleRange; +use std::collections::VecDeque; use std::mem; use std::rc::Rc; +use std::vec::IntoIter; pub trait DataPageBuilder { fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]); @@ -176,3 +180,134 @@ impl PageReader for InMemoryPageReader { Ok(self.pages.next()) } } + +/// A utility page iterator which stores page readers in memory, used for tests. +pub struct InMemoryPageIterator { + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + page_readers: IntoIter>, +} + +impl InMemoryPageIterator { + pub fn new( + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + pages: Vec>, + ) -> Self { + let page_readers = pages + .into_iter() + .map(|pages| Box::new(InMemoryPageReader::new(pages)) as Box) + .collect::>>() + .into_iter(); + + Self { + schema, + column_desc, + page_readers, + } + } +} + +impl Iterator for InMemoryPageIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + self.page_readers.next().map(|page_reader| Ok(page_reader)) + } +} + +impl PageIterator for InMemoryPageIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.column_desc.clone()) + } +} + +pub fn make_pages( + desc: ColumnDescPtr, + encoding: Encoding, + num_pages: usize, + levels_per_page: usize, + min: T::T, + max: T::T, + def_levels: &mut Vec, + rep_levels: &mut Vec, + values: &mut Vec, + pages: &mut VecDeque, + use_v2: bool, +) where + T::T: PartialOrd + SampleRange + Copy, +{ + let mut num_values = 0; + let max_def_level = desc.max_def_level(); + let max_rep_level = desc.max_rep_level(); + + let mem_tracker = Rc::new(MemTracker::new()); + let mut dict_encoder = DictEncoder::::new(desc.clone(), mem_tracker); + + for i in 0..num_pages { + let mut num_values_cur_page = 0; + let level_range = i * levels_per_page..(i + 1) * levels_per_page; + + if max_def_level > 0 { + random_numbers_range(levels_per_page, 0, max_def_level + 1, def_levels); + for dl in &def_levels[level_range.clone()] { + if *dl == max_def_level { + num_values_cur_page += 1; + } + } + } else { + num_values_cur_page = levels_per_page; + } + if max_rep_level > 0 { + random_numbers_range(levels_per_page, 0, max_rep_level + 1, rep_levels); + } + random_numbers_range(num_values_cur_page, min, max, values); + + // Generate the current page + + let mut pb = + DataPageBuilderImpl::new(desc.clone(), num_values_cur_page as u32, use_v2); + if max_rep_level > 0 { + pb.add_rep_levels(max_rep_level, &rep_levels[level_range.clone()]); + } + if max_def_level > 0 { + pb.add_def_levels(max_def_level, &def_levels[level_range]); + } + + let value_range = num_values..num_values + num_values_cur_page; + match encoding { + Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => { + let _ = dict_encoder.put(&values[value_range.clone()]); + let indices = dict_encoder + .write_indices() + .expect("write_indices() should be OK"); + pb.add_indices(indices); + } + Encoding::PLAIN => { + pb.add_values::(encoding, &values[value_range]); + } + enc @ _ => panic!("Unexpected encoding {}", enc), + } + + let data_page = pb.consume(); + pages.push_back(data_page); + num_values += num_values_cur_page; + } + + if encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY { + let dict = dict_encoder + .write_dict() + .expect("write_dict() should be OK"); + let dict_page = Page::DictionaryPage { + buf: dict, + num_values: dict_encoder.num_entries() as u32, + encoding: Encoding::RLE_DICTIONARY, + is_sorted: false, + }; + pages.push_front(dict_page); + } +} From 215f73b38a3758685141600f233916d766f285b0 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 4 Sep 2019 18:43:50 +0800 Subject: [PATCH 2/5] struct array reader --- rust/parquet/src/arrow/array_reader.rs | 164 ++++++++++++++++++++----- 1 file changed, 133 insertions(+), 31 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 3852614fbb70..c8ea9085b191 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -94,7 +94,7 @@ pub trait ArrayReader { /// and read them into primitive arrays. pub struct PrimitiveArrayReader { data_type: ArrowType, - pages: Box, + pages: Box, def_levels_buffer: Option, rep_levels_buffer: Option, column_desc: ColumnDescPtr, @@ -104,7 +104,10 @@ pub struct PrimitiveArrayReader { impl PrimitiveArrayReader { /// Construct primitive array reader. - pub fn new(mut pages: Box, column_desc: ColumnDescPtr) -> Result { + pub fn new( + mut pages: Box, + column_desc: ColumnDescPtr, + ) -> Result { let data_type = parquet_to_arrow_field(column_desc.clone())? .data_type() .clone(); @@ -137,7 +140,7 @@ impl ArrayReader for PrimitiveArrayReader { } /// Reads at most `batch_size` records into array. - fn next_batch(&mut self, batch_size: usize) -> Result> { + fn next_batch(&mut self, batch_size: usize) -> Result { let mut records_read = 0usize; while records_read < batch_size { let records_to_read = batch_size - records_read; @@ -247,6 +250,7 @@ impl ArrayReader for PrimitiveArrayReader { } } +/// Implementation of struct array reader. struct StructArrayReader { children: Vec>, data_type: ArrowType, @@ -257,6 +261,7 @@ struct StructArrayReader { } impl StructArrayReader { + /// Construct struct array reader. pub fn new( data_type: ArrowType, children: Vec>, @@ -275,13 +280,33 @@ impl StructArrayReader { } impl ArrayReader for StructArrayReader { + /// Returns data type. + /// This must be a struct. fn get_data_type(&self) -> &ArrowType { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result> { + /// Read `batch_size` struct records. + /// + /// Definition levels of struct array is calculated as following: + /// ```ignore + /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ..., + /// childn_def_levels[i]); + /// ``` + /// + /// Repetition levels of struct array is calculated as following: + /// ```ignore + /// rep_levels[i] = child1_rep_levels[i]; + /// ``` + /// + /// The null bitmap of struct array is calculated from def_levels: + /// ```ignore + /// null_bitmap[i] = (def_levels[i] >= self.def_level); + /// ``` + fn next_batch(&mut self, batch_size: usize) -> Result { if self.children.len() == 0 { self.def_level_buffer = None; + self.rep_level_buffer = None; return Ok(Arc::new(StructArray::from(Vec::new()))); } @@ -291,7 +316,7 @@ impl ArrayReader for StructArrayReader { .map(|reader| reader.next_batch(batch_size)) .try_fold( Vec::new(), - |mut result, child_array| -> Result>> { + |mut result, child_array| -> Result> { result.push(child_array?); Ok(result) }, @@ -307,11 +332,6 @@ impl ArrayReader for StructArrayReader { return Err(general_err!("Not all children array length are the same!")); } - // let data_type = children_array.iter() - // .map(|arr| arr.data_ref().data_type().clone()) - // .collect::>(); - // - // let data_type = ArrowType::Struct(data_type); // calculate struct def level data let buffer_size = children_array_len * size_of::(); let mut def_level_data_buffer = MutableBuffer::new(buffer_size); @@ -321,37 +341,36 @@ impl ArrayReader for StructArrayReader { let ptr = transmute::<*const u8, *mut i16>(def_level_data_buffer.raw_data()); from_raw_parts_mut(ptr, children_array_len) }; + def_level_data .iter_mut() .for_each(|v| *v = self.struct_def_level); - self.children.iter().try_for_each(|child| { + for child in &self.children { if let Some(current_child_def_levels) = child.get_def_levels() { if current_child_def_levels.len() != children_array_len { - Err(general_err!("Child array length are not equal!")) + return Err(general_err!("Child array length are not equal!")); } else { for i in 0..children_array_len { def_level_data[i] = min(def_level_data[i], current_child_def_levels[i]); } - Ok(()) } - } else { - Ok(()) } - })?; + } // calculate bitmap for current array let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); let mut null_count = 0; - def_level_data.iter().try_for_each(|item| { - let is_null = *item < self.struct_def_level; - if is_null { + for def_level in def_level_data { + let not_null = *def_level >= self.struct_def_level; + if !not_null { null_count += 1; } - bitmap_builder.append(is_null) - })?; + bitmap_builder.append(not_null)?; + } + // Now we can build array data let array_data = ArrayDataBuilder::new(self.data_type.clone()) .len(children_array_len) .null_count(null_count) @@ -395,7 +414,7 @@ impl ArrayReader for StructArrayReader { struct ListArrayReader { data_type: ArrowType, - child: Box, + child: Box, rep_level: i16, def_level: i16, def_level_buffer: Option, @@ -403,7 +422,7 @@ struct ListArrayReader { } impl ListArrayReader { - pub fn new(child: Box, rep_level: i16, def_level: i16) -> Self { + pub fn new(child: Box, rep_level: i16, def_level: i16) -> Self { Self { data_type: List(Box::new(child.get_data_type().clone())), child, @@ -420,7 +439,7 @@ impl ArrayReader for ListArrayReader { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result> { + fn next_batch(&mut self, batch_size: usize) -> Result { let child_array = self.child.next_batch(batch_size)?; let mut offset_builder = Int32BufferBuilder::new(child_array.len()); @@ -474,9 +493,6 @@ impl ArrayReader for ListArrayReader { rep_level_builder.append(cur_rep_level)?; def_level_builder.append(cur_def_level)?; offset_builder.append(child_array.len() as i32)?; - - // dbg!(&self.data_type); - // dbg!(child_def_levels); } self.rep_level_buffer = Some(rep_level_builder.finish()); @@ -660,7 +676,7 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> &mut self, _cur_type: Rc, _context: &'a ArrayReaderBuilderContext, - ) -> Result>> { + ) -> Result>> { Err(ArrowError(format!( "Reading parquet map array into arrow is not supported yet!" ))) @@ -694,7 +710,7 @@ impl<'a> ArrayReaderBuilder { fn new( root_schema: TypePtr, columns_included: Rc>, - file_reader: Rc, + file_reader: Rc, ) -> Self { Self { root_schema, @@ -804,22 +820,25 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { - use crate::arrow::array_reader::{ArrayReader, PrimitiveArrayReader}; + use crate::arrow::array_reader::{ArrayReader, PrimitiveArrayReader, StructArrayReader}; use crate::basic::Encoding; use crate::column::page::Page; use crate::data_type::{DataType, Int32Type}; + use crate::errors::Result; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; use crate::util::test_common::make_pages; use crate::util::test_common::page_util::{ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, }; - use arrow::array::PrimitiveArray; + use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray}; use arrow::compute::max; use arrow::datatypes::Int32Type as ArrowInt32; + use arrow::datatypes::{DataType as ArrowType, Field}; use rand::distributions::range::SampleRange; use std::collections::VecDeque; use std::rc::Rc; + use std::sync::Arc; fn make_column_chuncks( column_desc: ColumnDescPtr, @@ -1039,4 +1058,87 @@ mod tests { ); } } + + /// Array reader for test. + struct InMemoryArrayReader { + data_type: ArrowType, + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, + } + + impl InMemoryArrayReader { + pub fn new( + data_type: ArrowType, + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, + ) -> Self { + Self { + data_type, + array, + def_levels, + rep_levels, + } + } + } + + impl ArrayReader for InMemoryArrayReader { + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + Ok(self.array.clone()) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels.as_ref().map(|v| v.as_slice()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels.as_ref().map(|v| v.as_slice()) + } + } + + #[test] + fn test_struct_array_reader() { + let array_1 = Arc::new(PrimitiveArray::::from(vec![1, 2, 3, 4, 5])); + let array_reader_1 = InMemoryArrayReader::new( + ArrowType::Int32, + array_1.clone(), + Some(vec![0, 1, 2, 3, 1]), + Some(vec![1, 1, 1, 1, 1]), + ); + + let array_2 = Arc::new(PrimitiveArray::::from(vec![5, 4, 3, 2, 1])); + let array_reader_2 = InMemoryArrayReader::new( + ArrowType::Int32, + array_2.clone(), + Some(vec![0, 1, 3, 1, 2]), + Some(vec![1, 1, 1, 1, 1]), + ); + + let struct_type = ArrowType::Struct(vec![ + Field::new("f1", array_1.data_type().clone(), true), + Field::new("f2", array_2.data_type().clone(), true), + ]); + + let mut struct_array_reader = StructArrayReader::new(struct_type, vec![Box::new + (array_reader_1), Box::new(array_reader_2)], 1, 1); + + + let struct_array = struct_array_reader.next_batch(5).unwrap(); + let struct_array = struct_array.as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(5, struct_array.len()); + assert_eq!(vec![true, false, false, false, false], (0..5).map(|idx| struct_array + .data_ref().is_null(idx)).collect::>()); + assert_eq!(Some(vec![0, 1, 1, 1, 1].as_slice()), struct_array_reader + .get_def_levels()); + assert_eq!(Some(vec![1, 1, 1, 1, 1].as_slice()), struct_array_reader + .get_rep_levels()); + } } From 4dd9a01ba10532dc8ed67917d1cdac9141731a02 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 13 Sep 2019 20:13:08 +0800 Subject: [PATCH 3/5] Initial support for array reader --- rust/parquet/src/arrow/array_reader.rs | 312 ++++++++----------------- rust/parquet/src/column/reader.rs | 7 +- 2 files changed, 100 insertions(+), 219 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index c8ea9085b191..6e80e915bb20 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -22,49 +22,27 @@ use std::mem::size_of; use std::mem::transmute; use std::rc::Rc; use std::result::Result::Ok; -use std::slice::from_raw_parts; use std::slice::from_raw_parts_mut; use std::sync::Arc; use std::vec::Vec; -use arrow::array::ArrayDataRef; -use arrow::array::ArrayRef; -use arrow::array::BufferBuilderTrait; -use arrow::array::StructArray; -use arrow::array::{Array, ListArray}; -use arrow::array::{ArrayDataBuilder, Int32BufferBuilder}; -use arrow::array::{BooleanBufferBuilder, Int16BufferBuilder}; -use arrow::bitmap::Bitmap; -use arrow::buffer::Buffer; -use arrow::buffer::MutableBuffer; -use arrow::datatypes::DataType::List; +use arrow::array::{ArrayDataRef, ArrayRef, BufferBuilderTrait, StructArray, + ArrayDataBuilder, BooleanBufferBuilder, + Int16BufferBuilder}; +use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{DataType as ArrowType, Field}; -use crate::arrow::converter::BooleanConverter; -use crate::arrow::converter::Converter; -use crate::arrow::converter::Float32Converter; -use crate::arrow::converter::Float64Converter; -use crate::arrow::converter::Int16Converter; -use crate::arrow::converter::Int32Converter; -use crate::arrow::converter::Int64Converter; -use crate::arrow::converter::Int8Converter; -use crate::arrow::converter::UInt16Converter; -use crate::arrow::converter::UInt32Converter; -use crate::arrow::converter::UInt64Converter; -use crate::arrow::converter::UInt8Converter; +use crate::arrow::converter::{BooleanConverter, Converter, Float32Converter, + Float64Converter, Int16Converter, Int32Converter, + Int64Converter, Int8Converter, UInt16Converter, + UInt32Converter, UInt64Converter, UInt8Converter}; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{Repetition, Type as PhysicalType}; use crate::column::page::PageIterator; -use crate::data_type::DataType; -use crate::data_type::DoubleType; -use crate::data_type::FloatType; -use crate::data_type::Int32Type; -use crate::data_type::Int64Type; -use crate::data_type::{BoolType, ByteArrayType, Int96Type}; -use crate::errors::ParquetError; -use crate::errors::ParquetError::ArrowError; -use crate::errors::Result; +use crate::data_type::{DataType, DoubleType, FloatType, Int32Type, Int64Type, BoolType, + ByteArrayType, Int96Type}; +use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; use crate::file::reader::{FilePageIterator, FileReader}; use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, @@ -412,129 +390,12 @@ impl ArrayReader for StructArrayReader { } } -struct ListArrayReader { - data_type: ArrowType, - child: Box, - rep_level: i16, - def_level: i16, - def_level_buffer: Option, - rep_level_buffer: Option, -} - -impl ListArrayReader { - pub fn new(child: Box, rep_level: i16, def_level: i16) -> Self { - Self { - data_type: List(Box::new(child.get_data_type().clone())), - child, - rep_level, - def_level, - def_level_buffer: None, - rep_level_buffer: None, - } - } -} - -impl ArrayReader for ListArrayReader { - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn next_batch(&mut self, batch_size: usize) -> Result { - let child_array = self.child.next_batch(batch_size)?; - - let mut offset_builder = Int32BufferBuilder::new(child_array.len()); - let mut null_bitmap_builder = BooleanBufferBuilder::new(child_array.len()); - let mut rep_level_builder = Int16BufferBuilder::new(child_array.len()); - let mut def_level_builder = Int16BufferBuilder::new(child_array.len()); - let mut len = 0; - - let arrow_type = ArrowType::List(Box::new(child_array.data_type().clone())); - - if child_array.len() > 0 { - // Since this is a list, we assume that child array should have repetition - // levels and definition levels - let child_rep_levels = self.child.get_rep_levels().ok_or_else(|| { - general_err!( - "Repetition levels should exist for list \ - array!" - ) - })?; - let child_def_levels = self.child.get_def_levels().ok_or_else(|| { - general_err!( - "Definition levels should exist for list \ - array!" - ) - })?; - - let mut cur_rep_level = child_rep_levels[0]; - let mut cur_def_level = child_def_levels[0]; - offset_builder.append(0)?; - len = 1; - - for idx in 1..child_array.len() { - if child_rep_levels[idx] < self.rep_level { - len = len + 1; - rep_level_builder.append(cur_rep_level)?; - def_level_builder.append(cur_def_level)?; - - let current_not_null = cur_def_level >= self.def_level; - null_bitmap_builder.append(current_not_null)?; - - offset_builder.append(idx as i32)?; - cur_rep_level = child_rep_levels[idx]; - cur_def_level = child_def_levels[idx]; - } else { - cur_rep_level = min(cur_rep_level, child_rep_levels[idx]); - cur_def_level = min(cur_def_level, child_def_levels[idx]); - } - } - - null_bitmap_builder.append(cur_def_level >= self.def_level)?; - rep_level_builder.append(cur_rep_level)?; - def_level_builder.append(cur_def_level)?; - offset_builder.append(child_array.len() as i32)?; - } - - self.rep_level_buffer = Some(rep_level_builder.finish()); - self.def_level_buffer = Some(def_level_builder.finish()); - - let offset_buffer = offset_builder.finish(); - let null_bitmap = null_bitmap_builder.finish(); - - let array_data = ArrayDataBuilder::new(arrow_type) - .len(len) - .add_buffer(offset_buffer) - .null_bit_buffer(null_bitmap) - .add_child_data(child_array.data()) - .build(); - - Ok(Arc::new(ListArray::from(array_data))) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_level_buffer.as_ref().map(|buf| { - let len = buf.len() * size_of::() / size_of::(); - unsafe { - from_raw_parts(transmute::<*const u8, *const i16>(buf.raw_data()), len) - } - }) - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_level_buffer.as_ref().map(|buf| { - let len = buf.len() * size_of::() / size_of::(); - unsafe { - from_raw_parts(transmute::<*const u8, *const i16>(buf.raw_data()), len) - } - }) - } -} - +/// Create array reader from parquet schema, column indices, and parquet file reader. pub fn build_array_reader( parquet_schema: SchemaDescPtr, column_indices: T, - file_reader: Rc, -) -> Result> + file_reader: Rc, +) -> Result> where T: IntoIterator, { @@ -566,14 +427,16 @@ where .build_array_reader() } +/// Used to build array reader. struct ArrayReaderBuilder { root_schema: TypePtr, // Key: columns that need to be included in final array builder // Value: column index in schema columns_included: Rc>, - file_reader: Rc, + file_reader: Rc, } +/// Used in type visitor. #[derive(Clone)] struct ArrayReaderBuilderContext { def_level: i16, @@ -591,15 +454,18 @@ impl Default for ArrayReaderBuilderContext { } } -impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> +/// Create array reader by visiting schema. +impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> for ArrayReaderBuilder { /// Build array reader for primitive type. + /// Currently we don't have a list reader implementation, so repeated type is not + /// supported yet. fn visit_primitive( &mut self, cur_type: TypePtr, context: &'a ArrayReaderBuilderContext, - ) -> Result>> { + ) -> Result>> { if self.is_included(cur_type.as_ref()) { let mut new_context = context.clone(); new_context.path.append(vec![cur_type.name().to_string()]); @@ -619,11 +485,9 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?; if cur_type.get_basic_info().repetition() == Repetition::REPEATED { - Ok(Some(Box::new(ListArrayReader::new( - reader, - new_context.rep_level, - new_context.def_level, - )))) + Err(ArrowError( + "Reading repeated field is not supported yet!".to_string(), + )) } else { Ok(Some(reader)) } @@ -632,6 +496,7 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> } } + /// Build array reader for struct type. fn visit_struct( &mut self, cur_type: Rc, @@ -659,11 +524,9 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> if cur_type.get_basic_info().has_repetition() && cur_type.get_basic_info().repetition() == Repetition::REPEATED { - Ok(Some(Box::new(ListArrayReader::new( - reader, - new_context.rep_level, - new_context.def_level, - )))) + Err(ArrowError( + "Reading repeated field is not supported yet!".to_string(), + )) } else { Ok(Some(reader)) } @@ -672,41 +535,34 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> } } + /// Build array reader for map type. + /// Currently this is not supported. fn visit_map( &mut self, _cur_type: Rc, _context: &'a ArrayReaderBuilderContext, ) -> Result>> { - Err(ArrowError(format!( - "Reading parquet map array into arrow is not supported yet!" - ))) + Err(ArrowError( + "Reading parquet map array into arrow is not supported yet!".to_string(), + )) } + /// Build array reader for list type. + /// Currently this is not supported. fn visit_list_with_item( &mut self, - list_type: Rc, - item_type: &Type, - context: &'a ArrayReaderBuilderContext, - ) -> Result>> { - let mut new_context = context.clone(); - new_context.rep_level += 1; - new_context.def_level += 1; - new_context.path.append(vec![list_type.name().to_string()]); - - self.dispatch(Rc::new(item_type.clone()), &new_context) - .map(|child_opt| { - child_opt.map(|child| -> Box { - Box::new(ListArrayReader::new( - child, - new_context.rep_level, - new_context.def_level, - )) - }) - }) + _list_type: Rc, + _item_type: &Type, + _context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + Err(ArrowError( + "Reading parquet list array into arrow is not supported yet!".to_string(), + )) } } impl<'a> ArrayReaderBuilder { + /// Construct array reader builder. fn new( root_schema: TypePtr, columns_included: Rc>, @@ -719,7 +575,8 @@ impl<'a> ArrayReaderBuilder { } } - fn build_array_reader(&mut self) -> Result> { + /// Main entry point. + fn build_array_reader(&mut self) -> Result> { let context = ArrayReaderBuilderContext::default(); self.visit_struct(self.root_schema.clone(), &context) @@ -727,16 +584,18 @@ impl<'a> ArrayReaderBuilder { } // Utility functions + + /// Check whether one column in included in this array reader builder. fn is_included(&self, t: &Type) -> bool { self.columns_included.contains_key(&(t as *const Type)) } - // Functions for primitive types. + /// Creates primitive array reader for each primitive type. fn build_for_primitive_type_inner( &self, cur_type: TypePtr, context: &'a ArrayReaderBuilderContext, - ) -> Result> { + ) -> Result> { let column_desc = Rc::new(ColumnDescriptor::new( cur_type.clone(), Some(self.root_schema.clone()), @@ -785,11 +644,12 @@ impl<'a> ArrayReaderBuilder { } } + /// Constructs struct array reader without considering repetition. fn build_for_struct_type_inner( &mut self, cur_type: TypePtr, context: &'a ArrayReaderBuilderContext, - ) -> Result>> { + ) -> Result>> { let mut fields = Vec::with_capacity(cur_type.get_fields().len()); let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); @@ -820,25 +680,22 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { - use crate::arrow::array_reader::{ArrayReader, PrimitiveArrayReader, StructArrayReader}; + use crate::arrow::array_reader::{ArrayReader, PrimitiveArrayReader, StructArrayReader, build_array_reader}; use crate::basic::Encoding; use crate::column::page::Page; use crate::data_type::{DataType, Int32Type}; use crate::errors::Result; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; - use crate::util::test_common::make_pages; - use crate::util::test_common::page_util::{ - DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, - }; + use crate::util::test_common::{make_pages, get_test_file}; + use crate::util::test_common::page_util::InMemoryPageIterator; use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray}; - use arrow::compute::max; - use arrow::datatypes::Int32Type as ArrowInt32; - use arrow::datatypes::{DataType as ArrowType, Field}; + use arrow::datatypes::{Int32Type as ArrowInt32, DataType as ArrowType, Field}; use rand::distributions::range::SampleRange; use std::collections::VecDeque; use std::rc::Rc; use std::sync::Arc; + use crate::file::reader::{FileReader, SerializedFileReader}; fn make_column_chuncks( column_desc: ColumnDescPtr, @@ -1088,7 +945,7 @@ mod tests { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { + fn next_batch(&mut self, _batch_size: usize) -> Result { Ok(self.array.clone()) } @@ -1124,21 +981,50 @@ mod tests { Field::new("f2", array_2.data_type().clone(), true), ]); - let mut struct_array_reader = StructArrayReader::new(struct_type, vec![Box::new - (array_reader_1), Box::new(array_reader_2)], 1, 1); - + let mut struct_array_reader = StructArrayReader::new( + struct_type, + vec![Box::new(array_reader_1), Box::new(array_reader_2)], + 1, + 1, + ); let struct_array = struct_array_reader.next_batch(5).unwrap(); - let struct_array = struct_array.as_any() - .downcast_ref::() - .unwrap(); + let struct_array = struct_array.as_any().downcast_ref::().unwrap(); assert_eq!(5, struct_array.len()); - assert_eq!(vec![true, false, false, false, false], (0..5).map(|idx| struct_array - .data_ref().is_null(idx)).collect::>()); - assert_eq!(Some(vec![0, 1, 1, 1, 1].as_slice()), struct_array_reader - .get_def_levels()); - assert_eq!(Some(vec![1, 1, 1, 1, 1].as_slice()), struct_array_reader - .get_rep_levels()); + assert_eq!( + vec![true, false, false, false, false], + (0..5) + .map(|idx| struct_array.data_ref().is_null(idx)) + .collect::>() + ); + assert_eq!( + Some(vec![0, 1, 1, 1, 1].as_slice()), + struct_array_reader.get_def_levels() + ); + assert_eq!( + Some(vec![1, 1, 1, 1, 1].as_slice()), + struct_array_reader.get_rep_levels() + ); + } + + #[test] + fn test_create_array_reader() { + let file = get_test_file("nulls.snappy.parquet"); + let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); + + let array_reader = build_array_reader(file_reader.metadata().file_metadata() + .schema_descr_ptr(), + vec![0usize].into_iter(), + file_reader).unwrap(); + + // Create arrow types + let arrow_type = ArrowType::Struct(vec![ + Field::new("b_struct", ArrowType::Struct(vec![ + Field::new("b_c_int", ArrowType::Int32, true), + ]), true), + ]); + + assert_eq!(array_reader.get_data_type(), &arrow_type); } } diff --git a/rust/parquet/src/column/reader.rs b/rust/parquet/src/column/reader.rs index 16af10113980..cc3c26f72cd7 100644 --- a/rust/parquet/src/column/reader.rs +++ b/rust/parquet/src/column/reader.rs @@ -517,13 +517,8 @@ mod tests { use crate::basic::Type as PhysicalType; use crate::column::page::Page; - use crate::encodings::encoding::{DictEncoder, Encoder}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; - use crate::util::{ - memory::MemTracker, - test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}, - test_common::{make_pages, random_numbers_range}, - }; + use crate::util::test_common::make_pages; const NUM_LEVELS: usize = 128; const NUM_PAGES: usize = 2; From 6407dee0825f2b32e293d4af5fc71ec7bee860e3 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Sat, 14 Sep 2019 10:13:59 +0800 Subject: [PATCH 4/5] Fix format --- rust/parquet/src/arrow/array_reader.rs | 54 +++++++++++++++----------- rust/parquet/src/compression.rs | 1 - 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 6e80e915bb20..d2ad3310fc2d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -26,22 +26,26 @@ use std::slice::from_raw_parts_mut; use std::sync::Arc; use std::vec::Vec; -use arrow::array::{ArrayDataRef, ArrayRef, BufferBuilderTrait, StructArray, - ArrayDataBuilder, BooleanBufferBuilder, - Int16BufferBuilder}; +use arrow::array::{ + ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder, BufferBuilderTrait, + Int16BufferBuilder, StructArray, +}; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{DataType as ArrowType, Field}; -use crate::arrow::converter::{BooleanConverter, Converter, Float32Converter, - Float64Converter, Int16Converter, Int32Converter, - Int64Converter, Int8Converter, UInt16Converter, - UInt32Converter, UInt64Converter, UInt8Converter}; +use crate::arrow::converter::{ + BooleanConverter, Converter, Float32Converter, Float64Converter, Int16Converter, + Int32Converter, Int64Converter, Int8Converter, UInt16Converter, UInt32Converter, + UInt64Converter, UInt8Converter, +}; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{Repetition, Type as PhysicalType}; use crate::column::page::PageIterator; -use crate::data_type::{DataType, DoubleType, FloatType, Int32Type, Int64Type, BoolType, - ByteArrayType, Int96Type}; +use crate::data_type::{ + BoolType, ByteArrayType, DataType, DoubleType, FloatType, Int32Type, Int64Type, + Int96Type, +}; use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; use crate::file::reader::{FilePageIterator, FileReader}; use crate::schema::types::{ @@ -680,22 +684,24 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { - use crate::arrow::array_reader::{ArrayReader, PrimitiveArrayReader, StructArrayReader, build_array_reader}; + use crate::arrow::array_reader::{ + build_array_reader, ArrayReader, PrimitiveArrayReader, StructArrayReader, + }; use crate::basic::Encoding; use crate::column::page::Page; use crate::data_type::{DataType, Int32Type}; use crate::errors::Result; + use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; - use crate::util::test_common::{make_pages, get_test_file}; use crate::util::test_common::page_util::InMemoryPageIterator; + use crate::util::test_common::{get_test_file, make_pages}; use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray}; - use arrow::datatypes::{Int32Type as ArrowInt32, DataType as ArrowType, Field}; + use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32}; use rand::distributions::range::SampleRange; use std::collections::VecDeque; use std::rc::Rc; use std::sync::Arc; - use crate::file::reader::{FileReader, SerializedFileReader}; fn make_column_chuncks( column_desc: ColumnDescPtr, @@ -1010,20 +1016,22 @@ mod tests { #[test] fn test_create_array_reader() { - let file = get_test_file("nulls.snappy.parquet"); + let file = get_test_file("nulls.snappy.parquet"); let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); - let array_reader = build_array_reader(file_reader.metadata().file_metadata() - .schema_descr_ptr(), - vec![0usize].into_iter(), - file_reader).unwrap(); + let array_reader = build_array_reader( + file_reader.metadata().file_metadata().schema_descr_ptr(), + vec![0usize].into_iter(), + file_reader, + ) + .unwrap(); // Create arrow types - let arrow_type = ArrowType::Struct(vec![ - Field::new("b_struct", ArrowType::Struct(vec![ - Field::new("b_c_int", ArrowType::Int32, true), - ]), true), - ]); + let arrow_type = ArrowType::Struct(vec![Field::new( + "b_struct", + ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]), + true, + )]); assert_eq!(array_reader.get_data_type(), &arrow_type); } diff --git a/rust/parquet/src/compression.rs b/rust/parquet/src/compression.rs index bdc9729b155c..d29024ed5c88 100644 --- a/rust/parquet/src/compression.rs +++ b/rust/parquet/src/compression.rs @@ -338,5 +338,4 @@ mod tests { fn test_codec_zstd() { test_codec(CodecType::ZSTD); } - } From 433ababfabc25545afdf43b7e265eb5e2da6672c Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 25 Sep 2019 14:24:00 +0800 Subject: [PATCH 5/5] Remove unwraps with result --- rust/parquet/src/arrow/array_reader.rs | 28 ++++---- rust/parquet/src/arrow/converter.rs | 4 +- rust/parquet/src/arrow/record_reader.rs | 86 +++++++++++++------------ 3 files changed, 65 insertions(+), 53 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index d2ad3310fc2d..3a4a7864cbf4 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -95,12 +95,11 @@ impl PrimitiveArrayReader { .clone(); let mut record_reader = RecordReader::::new(column_desc.clone()); - record_reader.set_page_reader(pages.next().ok_or_else(|| { - general_err!( - "Can't \ - build array without pages!" - ) - })??)?; + record_reader.set_page_reader( + pages + .next() + .ok_or_else(|| general_err!("Can't build array without pages!"))??, + )?; Ok(Self { data_type, @@ -217,8 +216,8 @@ impl ArrayReader for PrimitiveArrayReader { }?; // save definition and repetition buffers - self.def_levels_buffer = self.record_reader.consume_def_levels(); - self.rep_levels_buffer = self.record_reader.consume_rep_levels(); + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; self.record_reader.reset(); Ok(array) } @@ -305,7 +304,10 @@ impl ArrayReader for StructArrayReader { )?; // check that array child data has same size - let children_array_len = children_array.first().unwrap().len(); + let children_array_len = + children_array.first().map(|arr| arr.len()).ok_or_else(|| { + general_err!("Struct array reader should have at least one child!") + })?; let all_children_len_eq = children_array .iter() @@ -371,7 +373,9 @@ impl ArrayReader for StructArrayReader { let rep_level_data = self .children .first() - .unwrap() + .ok_or_else(|| { + general_err!("Struct array reader should have at least one child!") + })? .get_rep_levels() .map(|data| -> Result { let mut buffer = Int16BufferBuilder::new(children_array_len); @@ -584,7 +588,9 @@ impl<'a> ArrayReaderBuilder { let context = ArrayReaderBuilderContext::default(); self.visit_struct(self.root_schema.clone(), &context) - .map(|reader| reader.unwrap()) + .and_then(|reader_opt| { + reader_opt.ok_or_else(|| general_err!("Failed to build array reader!")) + }) } // Utility functions diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 263e78a7fe67..6056271a7591 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -67,9 +67,9 @@ where let mut array_data = ArrayDataBuilder::new(ArrowSourceType::get_data_type()) .len(record_reader.num_values()) - .add_buffer(record_data); + .add_buffer(record_data?); - if let Some(b) = record_reader.consume_bitmap_buffer() { + if let Some(b) = record_reader.consume_bitmap_buffer()? { array_data = array_data.null_bit_buffer(b); } diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 21632cff6389..de42ae7f953d 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -190,15 +190,13 @@ impl RecordReader { /// The implementation has side effects. It will create a new buffer to hold those /// definition level values that have already been read into memory but not counted /// as record values, e.g. those from `self.num_values` to `self.values_written`. - pub fn consume_def_levels(&mut self) -> Option { + pub fn consume_def_levels(&mut self) -> Result> { let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { let num_left_values = self.values_written - self.num_values; let mut new_buffer = MutableBuffer::new( size_of::() * max(MIN_BATCH_SIZE, num_left_values), ); - new_buffer - .resize(num_left_values * size_of::()) - .unwrap(); + new_buffer.resize(num_left_values * size_of::())?; let new_def_levels = FatPtr::::with_offset(&new_buffer, 0); let new_def_levels = new_def_levels.to_slice_mut(); @@ -209,29 +207,25 @@ impl RecordReader { new_def_levels[0..num_left_values] .copy_from_slice(&left_def_levels[0..num_left_values]); - def_levels_buf - .resize(self.num_values * size_of::()) - .unwrap(); + def_levels_buf.resize(self.num_values * size_of::())?; Some(new_buffer) } else { None }; - replace(&mut self.def_levels, new_buffer).map(|x| x.freeze()) + Ok(replace(&mut self.def_levels, new_buffer).map(|x| x.freeze())) } /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_rep_levels(&mut self) -> Option { + pub fn consume_rep_levels(&mut self) -> Result> { // TODO: Optimize to reduce the copy let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { let num_left_values = self.values_written - self.num_values; let mut new_buffer = MutableBuffer::new( size_of::() * max(MIN_BATCH_SIZE, num_left_values), ); - new_buffer - .resize(num_left_values * size_of::()) - .unwrap(); + new_buffer.resize(num_left_values * size_of::())?; let new_rep_levels = FatPtr::::with_offset(&new_buffer, 0); let new_rep_levels = new_rep_levels.to_slice_mut(); @@ -242,26 +236,23 @@ impl RecordReader { new_rep_levels[0..num_left_values] .copy_from_slice(&left_rep_levels[0..num_left_values]); - rep_levels_buf - .resize(self.num_values * size_of::()) - .unwrap(); + rep_levels_buf.resize(self.num_values * size_of::())?; + Some(new_buffer) } else { None }; - replace(&mut self.rep_levels, new_buffer).map(|x| x.freeze()) + Ok(replace(&mut self.rep_levels, new_buffer).map(|x| x.freeze())) } /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_record_data(&mut self) -> Buffer { + pub fn consume_record_data(&mut self) -> Result { // TODO: Optimize to reduce the copy let num_left_values = self.values_written - self.num_values; let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); - new_buffer - .resize(num_left_values * T::get_type_size()) - .unwrap(); + new_buffer.resize(num_left_values * T::get_type_size())?; let new_records = FatPtr::::with_offset_and_size(&new_buffer, 0, T::get_type_size()); @@ -277,22 +268,23 @@ impl RecordReader { swap(&mut new_records[idx], &mut left_records[idx]); } - self.records - .resize(self.num_values * T::get_type_size()) - .unwrap(); - replace(&mut self.records, new_buffer).freeze() + self.records.resize(self.num_values * T::get_type_size())?; + + Ok(replace(&mut self.records, new_buffer).freeze()) } /// Returns currently stored null bitmap data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_bitmap_buffer(&mut self) -> Option { + pub fn consume_bitmap_buffer(&mut self) -> Result> { // TODO: Optimize to reduce the copy if self.column_desc.max_def_level() > 0 { + assert!(self.null_bitmap.is_some()); let num_left_values = self.values_written - self.num_values; let new_bitmap_builder = Some(BooleanBufferBuilder::new(max( MIN_BATCH_SIZE, num_left_values, ))); + let old_bitmap = replace(&mut self.null_bitmap, new_bitmap_builder) .map(|mut builder| builder.finish()) .unwrap(); @@ -303,13 +295,12 @@ impl RecordReader { self.null_bitmap .as_mut() .unwrap() - .append(old_bitmap.is_set(i)) - .unwrap(); + .append(old_bitmap.is_set(i))?; } - Some(old_bitmap.to_buffer()) + Ok(Some(old_bitmap.to_buffer())) } else { - None + Ok(None) } } @@ -325,9 +316,9 @@ impl RecordReader { } /// Returns bitmap data. - pub fn consume_bitmap(&mut self) -> Option { + pub fn consume_bitmap(&mut self) -> Result> { self.consume_bitmap_buffer() - .map(|buffer| Bitmap::from(buffer)) + .map(|buffer| buffer.map(|b| Bitmap::from(b))) } /// Try to read one batch of data. @@ -589,9 +580,12 @@ mod tests { let mut bb = Int32BufferBuilder::new(7); bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]).unwrap(); let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); - assert_eq!(None, record_reader.consume_def_levels()); - assert_eq!(None, record_reader.consume_bitmap()); + assert_eq!( + expected_buffer, + record_reader.consume_record_data().unwrap() + ); + assert_eq!(None, record_reader.consume_def_levels().unwrap()); + assert_eq!(None, record_reader.consume_bitmap().unwrap()); } #[test] @@ -674,7 +668,10 @@ mod tests { let mut bb = Int32BufferBuilder::new(7); bb.append_slice(&[0, 7, 0, 6, 3, 0, 8]).unwrap(); let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!( + expected_buffer, + record_reader.consume_record_data().unwrap() + ); // Verify result def levels let mut bb = Int16BufferBuilder::new(7); @@ -683,7 +680,7 @@ mod tests { let expected_def_levels = bb.finish(); assert_eq!( Some(expected_def_levels), - record_reader.consume_def_levels() + record_reader.consume_def_levels().unwrap() ); // Verify bitmap @@ -691,7 +688,10 @@ mod tests { bb.append_slice(&[false, true, false, true, true, false, true]) .unwrap(); let expected_bitmap = Bitmap::from(bb.finish()); - assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap()); + assert_eq!( + Some(expected_bitmap), + record_reader.consume_bitmap().unwrap() + ); } #[test] @@ -778,7 +778,10 @@ mod tests { let mut bb = Int32BufferBuilder::new(9); bb.append_slice(&[4, 0, 0, 7, 6, 3, 2, 8, 9]).unwrap(); let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!( + expected_buffer, + record_reader.consume_record_data().unwrap() + ); // Verify result def levels let mut bb = Int16BufferBuilder::new(9); @@ -787,7 +790,7 @@ mod tests { let expected_def_levels = bb.finish(); assert_eq!( Some(expected_def_levels), - record_reader.consume_def_levels() + record_reader.consume_def_levels().unwrap() ); // Verify bitmap @@ -795,7 +798,10 @@ mod tests { bb.append_slice(&[true, false, false, true, true, true, true, true, true]) .unwrap(); let expected_bitmap = Bitmap::from(bb.finish()); - assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap()); + assert_eq!( + Some(expected_bitmap), + record_reader.consume_bitmap().unwrap() + ); } #[test]