Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 2 additions & 38 deletions datafusion/core/tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
// under the License.

use async_trait::async_trait;
use datafusion::arrow::csv::WriterBuilder;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_sql::parser::{DFParser, Statement};
use log::info;
use normalize::normalize_batch;
use sqllogictest::{ColumnType, DBOutput};
use normalize::convert_batches;
use sqllogictest::DBOutput;
use sqlparser::ast::Statement as SQLStatement;
use std::path::{Path, PathBuf};
use std::time::Duration;
Expand Down Expand Up @@ -173,41 +172,6 @@ async fn context_for_test_file(file_name: &str) -> SessionContext {
}
}

fn convert_batches(batches: Vec<RecordBatch>) -> Result<DBOutput> {
let mut bytes = vec![];
if batches.is_empty() {
return Ok(DBOutput::StatementComplete(0));
}
// TODO: use the actual types
let types = vec![ColumnType::Any; batches[0].num_columns()];

{
let builder = WriterBuilder::new()
.has_headers(false)
.with_delimiter(b'\t');
let mut writer = builder.build(&mut bytes);
for batch in batches {
writer.write(&normalize_batch(batch)).unwrap();
}
}
let res = String::from_utf8(bytes).unwrap();
let rows = res
.lines()
.map(|s| {
s.split('\t')
.map(|s| {
if s.is_empty() {
"NULL".to_string()
} else {
s.to_string()
}
})
.collect()
})
.collect();
Ok(DBOutput::Rows { types, rows })
}

async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> Result<DBOutput> {
let sql = sql.into();
// Check if the sql is `insert`
Expand Down
112 changes: 66 additions & 46 deletions datafusion/core/tests/sqllogictests/src/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,81 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use crate::error::{DFSqlLogicTestError, Result};
use arrow::{array::ArrayRef, datatypes::DataType, record_batch::RecordBatch};
use datafusion::error::DataFusionError;
use sqllogictest::{ColumnType, DBOutput};

use arrow::{
array::{
as_largestring_array, as_string_array, ArrayRef, LargeStringArray, StringArray,
},
datatypes::DataType,
record_batch::RecordBatch,
};
/// Converts `batches` to a DBOutput as expected by sqllogicteset.
///
/// Assumes empty record batches are a successful statement completion
///
pub fn convert_batches(batches: Vec<RecordBatch>) -> Result<DBOutput> {
if batches.is_empty() {
// DataFusion doesn't report number of rows complete
return Ok(DBOutput::StatementComplete(0));
}

let schema = batches[0].schema();

/// Normalizes the content of a RecordBatch prior to printing.
// TODO: report the the actual types of the result
// https://github.com/apache/arrow-datafusion/issues/4499
let types = vec![ColumnType::Any; batches[0].num_columns()];

let mut rows = vec![];
for batch in batches {
// Verify schema
if schema != batch.schema() {
return Err(DFSqlLogicTestError::DataFusion(DataFusionError::Internal(
format!(
"Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}",
schema,
batch.schema()
),
)));
}
rows.append(&mut convert_batch(batch)?);
}

Ok(DBOutput::Rows { types, rows })
}

/// Convert a single batch to a `Vec<Vec<String>>` for comparison
fn convert_batch(batch: RecordBatch) -> Result<Vec<Vec<String>>> {
(0..batch.num_rows())
.map(|row| {
batch
.columns()
.iter()
.map(|col| cell_to_string(col, row))
.collect::<Result<Vec<String>>>()
})
.collect()
}

/// Normalizes the content of a single cell in RecordBatch prior to printing.
///
/// This is to make the output comparable to the semi-standard .slt format
///
/// Normalizations applied to [NULL Values and empty strings]
///
/// [NULL Values and empty strings]: https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings
pub fn normalize_batch(batch: RecordBatch) -> RecordBatch {
let new_columns = batch
.columns()
.iter()
.map(|array| {
match array.data_type() {
DataType::Utf8 => {
let arr: StringArray = as_string_array(array.as_ref())
.iter()
.map(normalize_string)
.collect();
Arc::new(arr) as ArrayRef
}
DataType::LargeUtf8 => {
let arr: LargeStringArray = as_largestring_array(array.as_ref())
.iter()
.map(normalize_string)
.collect();
Arc::new(arr) as ArrayRef
}
// todo normalize dictionary values

// no normalization on this type
_ => array.clone(),
}
})
.collect();
///
pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result<String> {
// represent any null value with the string "NULL"
if !col.is_valid(row) {
return Ok("NULL".into());
}

RecordBatch::try_new(batch.schema(), new_columns).expect("creating normalized batch")
}
// Convert to normal string representation
let mut s = arrow::util::display::array_value_to_string(col, row)
.map_err(DFSqlLogicTestError::Arrow)?;

fn normalize_string(v: Option<&str>) -> Option<&str> {
v.map(|v| {
// apply subsequent normalization depending on type if
if matches!(col.data_type(), DataType::Utf8 | DataType::LargeUtf8) && s.is_empty() {
// All empty strings are replaced with this value
if v.is_empty() {
"(empty)"
} else {
v
}
})
s = "(empty)".to_string();
}

Ok(s)
}
Loading