From 483ee9461bcaea633b7044a11ae0ea201d0e9208 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 23 Dec 2023 11:48:51 -0700 Subject: [PATCH 1/5] implement CSV writer serde --- datafusion/proto/proto/datafusion.proto | 20 ++ datafusion/proto/src/generated/pbjson.rs | 231 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 32 ++- datafusion/proto/src/logical_plan/mod.rs | 64 +++++ .../proto/src/physical_plan/from_proto.rs | 12 +- .../tests/cases/roundtrip_logical_plan.rs | 69 +++++- 6 files changed, 424 insertions(+), 4 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index d02fc8e91b41d..a8e95aa95ca2b 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1207,6 +1207,7 @@ message FileTypeWriterOptions { oneof FileType { JsonWriterOptions json_options = 1; ParquetWriterOptions parquet_options = 2; + CsvWriterOptions csv_options = 3; } } @@ -1218,6 +1219,25 @@ message ParquetWriterOptions { WriterProperties writer_properties = 1; } +message CsvWriterOptions { + // Optional column delimiter. Defaults to `b','` + string delimiter = 1; + // Whether to write column names as file headers. Defaults to `true` + bool has_header = 2; + // Optional date format for date arrays + string date_format = 3; + // Optional datetime format for datetime arrays + string datetime_format = 4; + // Optional timestamp format for timestamp arrays + string timestamp_format = 5; + // Optional timestamp format for timestamp with timezone arrays + //string timestamp_tz_format = 6; + // Optional time format for time arrays + string time_format = 7; + // Optional value to represent null + string null_value = 8; +} + message WriterProperties { uint64 data_page_size_limit = 1; uint64 dictionary_page_size_limit = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index f860b1f1e6a01..3a9067fbdbf41 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5151,6 +5151,223 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { deserializer.deserialize_struct("datafusion.CsvScanExecNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CsvWriterOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.delimiter.is_empty() { + len += 1; + } + if self.has_header { + len += 1; + } + if !self.date_format.is_empty() { + len += 1; + } + if !self.datetime_format.is_empty() { + len += 1; + } + if !self.timestamp_format.is_empty() { + len += 1; + } + if !self.timestamp_tz_format.is_empty() { + len += 1; + } + if !self.time_format.is_empty() { + len += 1; + } + if !self.null_value.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.CsvWriterOptions", len)?; + if !self.delimiter.is_empty() { + struct_ser.serialize_field("delimiter", &self.delimiter)?; + } + if self.has_header { + struct_ser.serialize_field("hasHeader", &self.has_header)?; + } + if !self.date_format.is_empty() { + struct_ser.serialize_field("dateFormat", &self.date_format)?; + } + if !self.datetime_format.is_empty() { + struct_ser.serialize_field("datetimeFormat", &self.datetime_format)?; + } + if !self.timestamp_format.is_empty() { + struct_ser.serialize_field("timestampFormat", &self.timestamp_format)?; + } + if !self.timestamp_tz_format.is_empty() { + struct_ser.serialize_field("timestampTzFormat", &self.timestamp_tz_format)?; + } + if !self.time_format.is_empty() { + struct_ser.serialize_field("timeFormat", &self.time_format)?; + } + if !self.null_value.is_empty() { + struct_ser.serialize_field("nullValue", &self.null_value)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CsvWriterOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "delimiter", + "has_header", + "hasHeader", + "date_format", + "dateFormat", + "datetime_format", + "datetimeFormat", + "timestamp_format", + "timestampFormat", + "timestamp_tz_format", + "timestampTzFormat", + "time_format", + "timeFormat", + "null_value", + "nullValue", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Delimiter, + HasHeader, + DateFormat, + DatetimeFormat, + TimestampFormat, + TimestampTzFormat, + TimeFormat, + NullValue, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "delimiter" => Ok(GeneratedField::Delimiter), + "hasHeader" | "has_header" => Ok(GeneratedField::HasHeader), + "dateFormat" | "date_format" => Ok(GeneratedField::DateFormat), + "datetimeFormat" | "datetime_format" => Ok(GeneratedField::DatetimeFormat), + "timestampFormat" | "timestamp_format" => Ok(GeneratedField::TimestampFormat), + "timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat), + "timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat), + "nullValue" | "null_value" => Ok(GeneratedField::NullValue), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CsvWriterOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.CsvWriterOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut delimiter__ = None; + let mut has_header__ = None; + let mut date_format__ = None; + let mut datetime_format__ = None; + let mut timestamp_format__ = None; + let mut timestamp_tz_format__ = None; + let mut time_format__ = None; + let mut null_value__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Delimiter => { + if delimiter__.is_some() { + return Err(serde::de::Error::duplicate_field("delimiter")); + } + delimiter__ = Some(map_.next_value()?); + } + GeneratedField::HasHeader => { + if has_header__.is_some() { + return Err(serde::de::Error::duplicate_field("hasHeader")); + } + has_header__ = Some(map_.next_value()?); + } + GeneratedField::DateFormat => { + if date_format__.is_some() { + return Err(serde::de::Error::duplicate_field("dateFormat")); + } + date_format__ = Some(map_.next_value()?); + } + GeneratedField::DatetimeFormat => { + if datetime_format__.is_some() { + return Err(serde::de::Error::duplicate_field("datetimeFormat")); + } + datetime_format__ = Some(map_.next_value()?); + } + GeneratedField::TimestampFormat => { + if timestamp_format__.is_some() { + return Err(serde::de::Error::duplicate_field("timestampFormat")); + } + timestamp_format__ = Some(map_.next_value()?); + } + GeneratedField::TimestampTzFormat => { + if timestamp_tz_format__.is_some() { + return Err(serde::de::Error::duplicate_field("timestampTzFormat")); + } + timestamp_tz_format__ = Some(map_.next_value()?); + } + GeneratedField::TimeFormat => { + if time_format__.is_some() { + return Err(serde::de::Error::duplicate_field("timeFormat")); + } + time_format__ = Some(map_.next_value()?); + } + GeneratedField::NullValue => { + if null_value__.is_some() { + return Err(serde::de::Error::duplicate_field("nullValue")); + } + null_value__ = Some(map_.next_value()?); + } + } + } + Ok(CsvWriterOptions { + delimiter: delimiter__.unwrap_or_default(), + has_header: has_header__.unwrap_or_default(), + date_format: date_format__.unwrap_or_default(), + datetime_format: datetime_format__.unwrap_or_default(), + timestamp_format: timestamp_format__.unwrap_or_default(), + timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(), + time_format: time_format__.unwrap_or_default(), + null_value: null_value__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.CsvWriterOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for CubeNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -7893,6 +8110,9 @@ impl serde::Serialize for FileTypeWriterOptions { file_type_writer_options::FileType::ParquetOptions(v) => { struct_ser.serialize_field("parquetOptions", v)?; } + file_type_writer_options::FileType::CsvOptions(v) => { + struct_ser.serialize_field("csvOptions", v)?; + } } } struct_ser.end() @@ -7909,12 +8129,15 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { "jsonOptions", "parquet_options", "parquetOptions", + "csv_options", + "csvOptions", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { JsonOptions, ParquetOptions, + CsvOptions, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7938,6 +8161,7 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { match value { "jsonOptions" | "json_options" => Ok(GeneratedField::JsonOptions), "parquetOptions" | "parquet_options" => Ok(GeneratedField::ParquetOptions), + "csvOptions" | "csv_options" => Ok(GeneratedField::CsvOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7972,6 +8196,13 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { return Err(serde::de::Error::duplicate_field("parquetOptions")); } file_type__ = map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::ParquetOptions) +; + } + GeneratedField::CsvOptions => { + if file_type__.is_some() { + return Err(serde::de::Error::duplicate_field("csvOptions")); + } + file_type__ = map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::CsvOptions) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 459d5a965cd3e..2bcd9c57985ad 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1642,7 +1642,7 @@ pub struct PartitionColumn { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FileTypeWriterOptions { - #[prost(oneof = "file_type_writer_options::FileType", tags = "1, 2")] + #[prost(oneof = "file_type_writer_options::FileType", tags = "1, 2, 3")] pub file_type: ::core::option::Option, } /// Nested message and enum types in `FileTypeWriterOptions`. @@ -1654,6 +1654,8 @@ pub mod file_type_writer_options { JsonOptions(super::JsonWriterOptions), #[prost(message, tag = "2")] ParquetOptions(super::ParquetWriterOptions), + #[prost(message, tag = "3")] + CsvOptions(super::CsvWriterOptions), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -1670,6 +1672,34 @@ pub struct ParquetWriterOptions { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct CsvWriterOptions { + /// Optional column delimiter. Defaults to `b','` + #[prost(string, tag = "1")] + pub delimiter: ::prost::alloc::string::String, + /// Whether to write column names as file headers. Defaults to `true` + #[prost(bool, tag = "2")] + pub has_header: bool, + /// Optional date format for date arrays + #[prost(string, tag = "3")] + pub date_format: ::prost::alloc::string::String, + /// Optional datetime format for datetime arrays + #[prost(string, tag = "4")] + pub datetime_format: ::prost::alloc::string::String, + /// Optional timestamp format for timestamp arrays + #[prost(string, tag = "5")] + pub timestamp_format: ::prost::alloc::string::String, + /// Optional timestamp format for timestamp with timezone arrays + #[prost(string, tag = "6")] + pub timestamp_tz_format: ::prost::alloc::string::String, + /// Optional time format for time arrays + #[prost(string, tag = "7")] + pub time_format: ::prost::alloc::string::String, + /// Optional value to represent null + #[prost(string, tag = "8")] + pub null_value: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct WriterProperties { #[prost(uint64, tag = "1")] pub data_page_size_limit: u64, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index d137a41fa19bf..e29a3787e0a5a 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::csv::WriterBuilder; use std::collections::HashMap; use std::fmt::Debug; use std::str::FromStr; @@ -64,6 +65,7 @@ use datafusion_expr::{ }; use datafusion::parquet::file::properties::{WriterProperties, WriterVersion}; +use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_expr::dml::CopyOptions; use prost::bytes::BufMut; @@ -846,6 +848,20 @@ impl AsLogicalPlan for LogicalPlanNode { Some(copy_to_node::CopyOptions::WriterOptions(opt)) => { match &opt.file_type { Some(ft) => match ft { + file_type_writer_options::FileType::CsvOptions( + writer_options, + ) => { + let writer_builder = + csv_writer_options_from_proto(writer_options); + CopyOptions::WriterOptions(Box::new( + FileTypeWriterOptions::CSV( + CsvWriterOptions::new( + writer_builder, + CompressionTypeVariant::UNCOMPRESSED, + ), + ), + )) + } file_type_writer_options::FileType::ParquetOptions( writer_options, ) => { @@ -1630,6 +1646,40 @@ impl AsLogicalPlan for LogicalPlanNode { } CopyOptions::WriterOptions(opt) => { match opt.as_ref() { + FileTypeWriterOptions::CSV(csv_opts) => { + let csv_options = &csv_opts.writer_options; + let csv_writer_options = protobuf::CsvWriterOptions { + delimiter: csv_options.delimiter().to_string(), + has_header: csv_options.header(), + date_format: csv_options + .date_format() + .unwrap_or("") + .to_owned(), + datetime_format: csv_options + .datetime_format() + .unwrap_or("") + .to_owned(), + timestamp_format: csv_options + .timestamp_format() + .unwrap_or("") + .to_owned(), + timestamp_tz_format: "".to_string(), + time_format: csv_options + .time_format() + .unwrap_or("") + .to_owned(), + null_value: csv_options.null().to_owned(), + }; + let csv_options = + file_type_writer_options::FileType::CsvOptions( + csv_writer_options, + ); + Some(copy_to_node::CopyOptions::WriterOptions( + protobuf::FileTypeWriterOptions { + file_type: Some(csv_options), + }, + )) + } FileTypeWriterOptions::Parquet(parquet_opts) => { let parquet_writer_options = protobuf::ParquetWriterOptions { @@ -1674,6 +1724,20 @@ impl AsLogicalPlan for LogicalPlanNode { } } +pub(crate) fn csv_writer_options_from_proto( + writer_options: &protobuf::CsvWriterOptions, +) -> WriterBuilder { + WriterBuilder::new() + //.with_delimiter(writer_options.delimiter) + .with_header(writer_options.has_header) + .with_date_format(writer_options.date_format.clone()) + .with_datetime_format(writer_options.datetime_format.clone()) + .with_timestamp_format(writer_options.timestamp_format.clone()) + // .with_timestamp_tz_format(writer_options.timestamp_tz_format.clone()) + .with_time_format(writer_options.time_format.clone()) + .with_null(writer_options.null_value.clone()) +} + pub(crate) fn writer_properties_to_proto( props: &WriterProperties, ) -> protobuf::WriterProperties { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 824eb60a57155..d8c4044132a44 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -39,6 +39,7 @@ use datafusion::physical_plan::windows::create_window_expr; use datafusion::physical_plan::{ functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr, }; +use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; @@ -53,7 +54,7 @@ use crate::logical_plan; use crate::protobuf; use crate::protobuf::physical_expr_node::ExprType; -use crate::logical_plan::writer_properties_from_proto; +use crate::logical_plan::{csv_writer_options_from_proto, writer_properties_from_proto}; use chrono::{TimeZone, Utc}; use object_store::path::Path; use object_store::ObjectMeta; @@ -766,11 +767,18 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions { let file_type = value .file_type .as_ref() - .ok_or_else(|| proto_error("Missing required field in protobuf"))?; + .ok_or_else(|| proto_error("Missing required file_type field in protobuf"))?; match file_type { protobuf::file_type_writer_options::FileType::JsonOptions(opts) => Ok( Self::JSON(JsonWriterOptions::new(opts.compression().into())), ), + protobuf::file_type_writer_options::FileType::CsvOptions(opt) => { + let write_options = csv_writer_options_from_proto(opt); + Ok(Self::CSV(CsvWriterOptions::new( + write_options, + CompressionTypeVariant::UNCOMPRESSED, + ))) + } protobuf::file_type_writer_options::FileType::ParquetOptions(opt) => { let props = opt.writer_properties.clone().unwrap_or_default(); let writer_properties = writer_properties_from_proto(&props)?; diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 3eeae01a643e0..c1925261ab3e3 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -20,6 +20,7 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; use arrow::array::{ArrayRef, FixedSizeListArray}; +use arrow::csv::WriterBuilder; use arrow::datatypes::{ DataType, Field, Fields, Int32Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionFields, UnionMode, @@ -35,8 +36,10 @@ use datafusion::parquet::file::properties::{WriterProperties, WriterVersion}; use datafusion::physical_plan::functions::make_scalar_function; use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig, SessionContext}; use datafusion::test_util::{TestTableFactory, TestTableProvider}; +use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::file_options::StatementOptions; +use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{internal_err, not_impl_err, plan_err, FileTypeWriterOptions}; use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, ScalarValue}; use datafusion_common::{FileType, Result}; @@ -386,10 +389,74 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { } _ => panic!(), } - Ok(()) } +#[tokio::test] +async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> { + let ctx = SessionContext::new(); + + let input = create_csv_scan(&ctx).await?; + + let writer_properties = WriterBuilder::new() + .with_delimiter(b'*') + .with_date_format("dd/MM/yyyy".to_string()) + .with_datetime_format("dd/MM/yyyy HH:mm:ss".to_string()) + .with_timestamp_format("HH:mm:ss.SSSSSS".to_string()) + //.with_timestamp_tz_format("HH:mm:ss.SSSSSS".to_string()) + .with_time_format("HH:mm:ss".to_string()) + .with_null("NIL".to_string()); + + let plan = LogicalPlan::Copy(CopyTo { + input: Arc::new(input), + output_url: "test.csv".to_string(), + file_format: FileType::CSV, + single_file_output: true, + copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV( + CsvWriterOptions::new( + writer_properties, + CompressionTypeVariant::UNCOMPRESSED, + ), + ))), + }); + + let bytes = logical_plan_to_bytes(&plan)?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; + assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); + + match logical_round_trip { + LogicalPlan::Copy(copy_to) => { + assert_eq!("test.csv", copy_to.output_url); + assert_eq!(FileType::CSV, copy_to.file_format); + assert!(copy_to.single_file_output); + match ©_to.copy_options { + CopyOptions::WriterOptions(y) => match y.as_ref() { + FileTypeWriterOptions::CSV(p) => { + let props = &p.writer_options; + assert_eq!(b'*', props.delimiter()); + assert_eq!("dd/MM/yyyy", props.date_format().unwrap()); + assert_eq!( + "dd/MM/yyyy HH:mm:ss", + props.datetime_format().unwrap() + ); + assert_eq!( + "dd/MM/yyyy HH:mm:ss", + props.timestamp_format().unwrap() + ); + // assert_eq!("dd/MM/yyyy HH:mm:ss", props.timestamp_tz_format().unwrap()); + assert_eq!("HH:mm:ss", props.time_format().unwrap()); + assert_eq!("NIL", props.null()); + } + _ => panic!(), + }, + _ => panic!(), + } + } + _ => panic!(), + } + + Ok(()) +} async fn create_csv_scan(ctx: &SessionContext) -> Result { ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()) .await?; From d7432b63960cdd21dcae95a8560d2af281241e74 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 23 Dec 2023 12:08:00 -0700 Subject: [PATCH 2/5] test passes --- datafusion/proto/src/logical_plan/mod.rs | 24 ++++++++++++++----- .../proto/src/physical_plan/from_proto.rs | 2 +- .../tests/cases/roundtrip_logical_plan.rs | 7 ++---- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index e29a3787e0a5a..5a4a8d61524fc 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -852,7 +852,7 @@ impl AsLogicalPlan for LogicalPlanNode { writer_options, ) => { let writer_builder = - csv_writer_options_from_proto(writer_options); + csv_writer_options_from_proto(writer_options)?; CopyOptions::WriterOptions(Box::new( FileTypeWriterOptions::CSV( CsvWriterOptions::new( @@ -1649,7 +1649,8 @@ impl AsLogicalPlan for LogicalPlanNode { FileTypeWriterOptions::CSV(csv_opts) => { let csv_options = &csv_opts.writer_options; let csv_writer_options = protobuf::CsvWriterOptions { - delimiter: csv_options.delimiter().to_string(), + delimiter: (csv_options.delimiter() as char) + .to_string(), has_header: csv_options.header(), date_format: csv_options .date_format() @@ -1726,16 +1727,27 @@ impl AsLogicalPlan for LogicalPlanNode { pub(crate) fn csv_writer_options_from_proto( writer_options: &protobuf::CsvWriterOptions, -) -> WriterBuilder { - WriterBuilder::new() - //.with_delimiter(writer_options.delimiter) +) -> Result { + let mut builder = WriterBuilder::new(); + if writer_options.delimiter.len() > 0 { + if let Some(delimiter) = writer_options.delimiter.chars().next() { + if delimiter.is_ascii() { + builder = builder.with_delimiter(delimiter as u8); + } else { + return Err(proto_error("CSV Delimiter is not ASCII")); + } + } else { + return Err(proto_error("Error parsing CSV Delimiter")); + } + } + Ok(builder .with_header(writer_options.has_header) .with_date_format(writer_options.date_format.clone()) .with_datetime_format(writer_options.datetime_format.clone()) .with_timestamp_format(writer_options.timestamp_format.clone()) // .with_timestamp_tz_format(writer_options.timestamp_tz_format.clone()) .with_time_format(writer_options.time_format.clone()) - .with_null(writer_options.null_value.clone()) + .with_null(writer_options.null_value.clone())) } pub(crate) fn writer_properties_to_proto( diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index d8c4044132a44..6f1e811510c62 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -773,7 +773,7 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions { Self::JSON(JsonWriterOptions::new(opts.compression().into())), ), protobuf::file_type_writer_options::FileType::CsvOptions(opt) => { - let write_options = csv_writer_options_from_proto(opt); + let write_options = csv_writer_options_from_proto(opt)?; Ok(Self::CSV(CsvWriterOptions::new( write_options, CompressionTypeVariant::UNCOMPRESSED, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index c1925261ab3e3..2116d28a3cc5d 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -439,11 +439,8 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> { "dd/MM/yyyy HH:mm:ss", props.datetime_format().unwrap() ); - assert_eq!( - "dd/MM/yyyy HH:mm:ss", - props.timestamp_format().unwrap() - ); - // assert_eq!("dd/MM/yyyy HH:mm:ss", props.timestamp_tz_format().unwrap()); + assert_eq!("HH:mm:ss.SSSSSS", props.timestamp_format().unwrap()); + // assert_eq!("HH:mm:ss.SSSSSS", props.timestamp_tz_format().unwrap()); assert_eq!("HH:mm:ss", props.time_format().unwrap()); assert_eq!("NIL", props.null()); } From 1a2bd84e9696caa88a1a93294cb5bfc6af1116dd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 23 Dec 2023 12:41:37 -0700 Subject: [PATCH 3/5] regen --- datafusion/proto/src/generated/pbjson.rs | 18 ------------------ datafusion/proto/src/generated/prost.rs | 3 +-- datafusion/proto/src/logical_plan/mod.rs | 2 +- 3 files changed, 2 insertions(+), 21 deletions(-) diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 3a9067fbdbf41..956244ffdbc25 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5174,9 +5174,6 @@ impl serde::Serialize for CsvWriterOptions { if !self.timestamp_format.is_empty() { len += 1; } - if !self.timestamp_tz_format.is_empty() { - len += 1; - } if !self.time_format.is_empty() { len += 1; } @@ -5199,9 +5196,6 @@ impl serde::Serialize for CsvWriterOptions { if !self.timestamp_format.is_empty() { struct_ser.serialize_field("timestampFormat", &self.timestamp_format)?; } - if !self.timestamp_tz_format.is_empty() { - struct_ser.serialize_field("timestampTzFormat", &self.timestamp_tz_format)?; - } if !self.time_format.is_empty() { struct_ser.serialize_field("timeFormat", &self.time_format)?; } @@ -5227,8 +5221,6 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { "datetimeFormat", "timestamp_format", "timestampFormat", - "timestamp_tz_format", - "timestampTzFormat", "time_format", "timeFormat", "null_value", @@ -5242,7 +5234,6 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { DateFormat, DatetimeFormat, TimestampFormat, - TimestampTzFormat, TimeFormat, NullValue, } @@ -5271,7 +5262,6 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { "dateFormat" | "date_format" => Ok(GeneratedField::DateFormat), "datetimeFormat" | "datetime_format" => Ok(GeneratedField::DatetimeFormat), "timestampFormat" | "timestamp_format" => Ok(GeneratedField::TimestampFormat), - "timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat), "timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat), "nullValue" | "null_value" => Ok(GeneratedField::NullValue), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -5298,7 +5288,6 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { let mut date_format__ = None; let mut datetime_format__ = None; let mut timestamp_format__ = None; - let mut timestamp_tz_format__ = None; let mut time_format__ = None; let mut null_value__ = None; while let Some(k) = map_.next_key()? { @@ -5333,12 +5322,6 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { } timestamp_format__ = Some(map_.next_value()?); } - GeneratedField::TimestampTzFormat => { - if timestamp_tz_format__.is_some() { - return Err(serde::de::Error::duplicate_field("timestampTzFormat")); - } - timestamp_tz_format__ = Some(map_.next_value()?); - } GeneratedField::TimeFormat => { if time_format__.is_some() { return Err(serde::de::Error::duplicate_field("timeFormat")); @@ -5359,7 +5342,6 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { date_format: date_format__.unwrap_or_default(), datetime_format: datetime_format__.unwrap_or_default(), timestamp_format: timestamp_format__.unwrap_or_default(), - timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(), time_format: time_format__.unwrap_or_default(), null_value: null_value__.unwrap_or_default(), }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 2bcd9c57985ad..2032340e66ac6 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1689,8 +1689,7 @@ pub struct CsvWriterOptions { #[prost(string, tag = "5")] pub timestamp_format: ::prost::alloc::string::String, /// Optional timestamp format for timestamp with timezone arrays - #[prost(string, tag = "6")] - pub timestamp_tz_format: ::prost::alloc::string::String, + /// string timestamp_tz_format = 6; /// Optional time format for time arrays #[prost(string, tag = "7")] pub time_format: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 5a4a8d61524fc..3060969fa1c7d 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1664,7 +1664,7 @@ impl AsLogicalPlan for LogicalPlanNode { .timestamp_format() .unwrap_or("") .to_owned(), - timestamp_tz_format: "".to_string(), + //timestamp_tz_format: "".to_string(), time_format: csv_options .time_format() .unwrap_or("") From 08898a46515551d154b45defd76257384db1cb4a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 24 Dec 2023 09:53:05 -0700 Subject: [PATCH 4/5] clippy --- datafusion/proto/proto/datafusion.proto | 6 ++---- datafusion/proto/src/generated/prost.rs | 6 ++---- datafusion/proto/src/logical_plan/mod.rs | 4 +--- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index a8e95aa95ca2b..59b82efcbb43a 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1230,12 +1230,10 @@ message CsvWriterOptions { string datetime_format = 4; // Optional timestamp format for timestamp arrays string timestamp_format = 5; - // Optional timestamp format for timestamp with timezone arrays - //string timestamp_tz_format = 6; // Optional time format for time arrays - string time_format = 7; + string time_format = 6; // Optional value to represent null - string null_value = 8; + string null_value = 7; } message WriterProperties { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 2032340e66ac6..32e892e663ef3 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1688,13 +1688,11 @@ pub struct CsvWriterOptions { /// Optional timestamp format for timestamp arrays #[prost(string, tag = "5")] pub timestamp_format: ::prost::alloc::string::String, - /// Optional timestamp format for timestamp with timezone arrays - /// string timestamp_tz_format = 6; /// Optional time format for time arrays - #[prost(string, tag = "7")] + #[prost(string, tag = "6")] pub time_format: ::prost::alloc::string::String, /// Optional value to represent null - #[prost(string, tag = "8")] + #[prost(string, tag = "7")] pub null_value: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 3060969fa1c7d..e997bcde426e5 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1664,7 +1664,6 @@ impl AsLogicalPlan for LogicalPlanNode { .timestamp_format() .unwrap_or("") .to_owned(), - //timestamp_tz_format: "".to_string(), time_format: csv_options .time_format() .unwrap_or("") @@ -1729,7 +1728,7 @@ pub(crate) fn csv_writer_options_from_proto( writer_options: &protobuf::CsvWriterOptions, ) -> Result { let mut builder = WriterBuilder::new(); - if writer_options.delimiter.len() > 0 { + if !writer_options.delimiter.is_empty() { if let Some(delimiter) = writer_options.delimiter.chars().next() { if delimiter.is_ascii() { builder = builder.with_delimiter(delimiter as u8); @@ -1745,7 +1744,6 @@ pub(crate) fn csv_writer_options_from_proto( .with_date_format(writer_options.date_format.clone()) .with_datetime_format(writer_options.datetime_format.clone()) .with_timestamp_format(writer_options.timestamp_format.clone()) - // .with_timestamp_tz_format(writer_options.timestamp_tz_format.clone()) .with_time_format(writer_options.time_format.clone()) .with_null(writer_options.null_value.clone())) } From 28dc3fc9ded1ca00cf78f9fabbf8499e3234e9ac Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 24 Dec 2023 09:53:46 -0700 Subject: [PATCH 5/5] remove commented code --- datafusion/proto/tests/cases/roundtrip_logical_plan.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 2116d28a3cc5d..2d7d85abda969 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -403,7 +403,6 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> { .with_date_format("dd/MM/yyyy".to_string()) .with_datetime_format("dd/MM/yyyy HH:mm:ss".to_string()) .with_timestamp_format("HH:mm:ss.SSSSSS".to_string()) - //.with_timestamp_tz_format("HH:mm:ss.SSSSSS".to_string()) .with_time_format("HH:mm:ss".to_string()) .with_null("NIL".to_string()); @@ -440,7 +439,6 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> { props.datetime_format().unwrap() ); assert_eq!("HH:mm:ss.SSSSSS", props.timestamp_format().unwrap()); - // assert_eq!("HH:mm:ss.SSSSSS", props.timestamp_tz_format().unwrap()); assert_eq!("HH:mm:ss", props.time_format().unwrap()); assert_eq!("NIL", props.null()); }