-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Reduce output when sqllogictest runs successfully, and run tests in parallel
#6393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e4b7ff7
d44b569
2110aa7
deb9a4e
139b594
19f145c
e6e6e93
14f81f5
fc6fc39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,16 +15,17 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::error::Error; | ||
| use std::path::{Path, PathBuf}; | ||
| #[cfg(target_family = "windows")] | ||
| use std::thread; | ||
|
|
||
| use futures::stream::StreamExt; | ||
| use log::info; | ||
| use sqllogictest::strict_column_validator; | ||
| use tempfile::TempDir; | ||
|
|
||
| use datafusion::prelude::{SessionConfig, SessionContext}; | ||
| use datafusion_common::{DataFusionError, Result}; | ||
|
|
||
| use crate::engines::datafusion::DataFusion; | ||
| use crate::engines::postgres::Postgres; | ||
|
|
@@ -42,7 +43,7 @@ pub fn main() { | |
| thread::Builder::new() | ||
| .stack_size(2 * 1024 * 1024) // 2 MB | ||
| .spawn(move || { | ||
| tokio::runtime::Builder::new_current_thread() | ||
| tokio::runtime::Builder::new_multi_thread() | ||
| .enable_all() | ||
| .build() | ||
| .unwrap() | ||
|
|
@@ -56,33 +57,72 @@ pub fn main() { | |
|
|
||
| #[tokio::main] | ||
| #[cfg(not(target_family = "windows"))] | ||
| pub async fn main() -> Result<(), Box<dyn Error>> { | ||
| pub async fn main() -> Result<()> { | ||
| run_tests().await | ||
| } | ||
|
|
||
| async fn run_tests() -> Result<(), Box<dyn Error>> { | ||
| async fn run_tests() -> Result<()> { | ||
| // Enable logging (e.g. set RUST_LOG=debug to see debug logs) | ||
| env_logger::init(); | ||
|
|
||
| let options = Options::new(); | ||
|
|
||
| for (path, relative_path) in read_test_files(&options) { | ||
| if options.complete_mode { | ||
| run_complete_file(&path, relative_path).await?; | ||
| } else if options.postgres_runner { | ||
| run_test_file_with_postgres(&path, relative_path).await?; | ||
| } else { | ||
| run_test_file(&path, relative_path).await?; | ||
| // Run all tests in parallel, reporting failures at the end | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just wondering, if there can be a race condition if multiple tests work with table t1, and the table dropped by test who has finished first. Other test could fail.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each I suppose if they all shared a temporary directory or something else that could be changed, that would be a problem. Perhaps I can add some comments in various places explaining why it is important to keep the tests from having externally visible side effects so they can be parallelized
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // | ||
| // Doing so is safe because each slt file runs with its own | ||
| // `SessionContext` and should not have side effects (like | ||
| // modifying shared state like `/tmp/`) | ||
| let errors: Vec<_> = futures::stream::iter(read_test_files(&options)) | ||
| .map(|test_file| { | ||
| tokio::task::spawn(async move { | ||
| println!("Running {:?}", test_file.relative_path); | ||
| if options.complete_mode { | ||
| run_complete_file(test_file).await?; | ||
| } else if options.postgres_runner { | ||
| run_test_file_with_postgres(test_file).await?; | ||
| } else { | ||
| run_test_file(test_file).await?; | ||
| } | ||
| Ok(()) as Result<()> | ||
| }) | ||
| }) | ||
| // run up to num_cpus streams in parallel | ||
| .buffer_unordered(num_cpus::get()) | ||
| .flat_map(|result| { | ||
| // Filter out any Ok() leaving only the DataFusionErrors | ||
| futures::stream::iter(match result { | ||
| // Tokio panic error | ||
| Err(e) => Some(DataFusionError::External(Box::new(e))), | ||
| Ok(thread_result) => match thread_result { | ||
| // Test run error | ||
| Err(e) => Some(e), | ||
| // success | ||
| Ok(_) => None, | ||
| }, | ||
| }) | ||
| }) | ||
| .collect() | ||
| .await; | ||
|
|
||
| // report on any errors | ||
| if !errors.is_empty() { | ||
| for e in &errors { | ||
| println!("{e}"); | ||
| } | ||
| Err(DataFusionError::Execution(format!( | ||
| "{} failures", | ||
| errors.len() | ||
| ))) | ||
| } else { | ||
| Ok(()) | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| async fn run_test_file( | ||
| path: &Path, | ||
| relative_path: PathBuf, | ||
| ) -> Result<(), Box<dyn Error>> { | ||
| async fn run_test_file(test_file: TestFile) -> Result<()> { | ||
| let TestFile { | ||
| path, | ||
| relative_path, | ||
| } = test_file; | ||
| info!("Running with DataFusion runner: {}", path.display()); | ||
| let Some(test_ctx) = context_for_test_file(&relative_path).await else { | ||
| info!("Skipping: {}", path.display()); | ||
|
|
@@ -91,26 +131,35 @@ async fn run_test_file( | |
| let ctx = test_ctx.session_ctx().clone(); | ||
| let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path)); | ||
| runner.with_column_validator(strict_column_validator); | ||
| runner.run_file_async(path).await?; | ||
| Ok(()) | ||
| runner | ||
| .run_file_async(path) | ||
| .await | ||
| .map_err(|e| DataFusionError::External(Box::new(e))) | ||
| } | ||
|
|
||
| async fn run_test_file_with_postgres( | ||
| path: &Path, | ||
| relative_path: PathBuf, | ||
| ) -> Result<(), Box<dyn Error>> { | ||
| async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> { | ||
| let TestFile { | ||
| path, | ||
| relative_path, | ||
| } = test_file; | ||
| info!("Running with Postgres runner: {}", path.display()); | ||
| let postgres_client = Postgres::connect(relative_path).await?; | ||
| let postgres_client = Postgres::connect(relative_path) | ||
| .await | ||
| .map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
| let mut runner = sqllogictest::Runner::new(postgres_client); | ||
| runner.with_column_validator(strict_column_validator); | ||
| runner.run_file_async(path).await?; | ||
| runner | ||
| .run_file_async(path) | ||
| .await | ||
| .map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| async fn run_complete_file( | ||
| path: &Path, | ||
| relative_path: PathBuf, | ||
| ) -> Result<(), Box<dyn Error>> { | ||
| async fn run_complete_file(test_file: TestFile) -> Result<()> { | ||
| let TestFile { | ||
| path, | ||
| relative_path, | ||
| } = test_file; | ||
| use sqllogictest::default_validator; | ||
|
|
||
| info!("Using complete mode to complete: {}", path.display()); | ||
|
|
@@ -120,7 +169,8 @@ async fn run_complete_file( | |
| return Ok(()) | ||
| }; | ||
| let ctx = test_ctx.session_ctx().clone(); | ||
| let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path)); | ||
| let mut runner = | ||
| sqllogictest::Runner::new(DataFusion::new(ctx, relative_path.clone())); | ||
| let col_separator = " "; | ||
| runner | ||
| .update_test_file( | ||
|
|
@@ -130,26 +180,42 @@ async fn run_complete_file( | |
| strict_column_validator, | ||
| ) | ||
| .await | ||
| .map_err(|e| e.to_string())?; | ||
| // Can't use e directly because it isn't marked Send, so turn it into a string. | ||
| .map_err(|e| { | ||
| DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) | ||
| }) | ||
| } | ||
|
|
||
| Ok(()) | ||
| /// Represents a parsed test file | ||
| #[derive(Debug)] | ||
| struct TestFile { | ||
| /// The absolute path to the file | ||
| pub path: PathBuf, | ||
| /// The relative path of the file (used for display) | ||
| pub relative_path: PathBuf, | ||
| } | ||
|
|
||
| impl TestFile { | ||
| fn new(path: PathBuf) -> Self { | ||
| let relative_path = PathBuf::from( | ||
| path.to_string_lossy() | ||
| .strip_prefix(TEST_DIRECTORY) | ||
| .unwrap_or(""), | ||
| ); | ||
|
|
||
| Self { | ||
| path, | ||
| relative_path, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn read_test_files<'a>( | ||
| options: &'a Options, | ||
| ) -> Box<dyn Iterator<Item = (PathBuf, PathBuf)> + 'a> { | ||
| fn read_test_files<'a>(options: &'a Options) -> Box<dyn Iterator<Item = TestFile> + 'a> { | ||
| Box::new( | ||
| read_dir_recursive(TEST_DIRECTORY) | ||
| .map(|path| { | ||
| ( | ||
| path.clone(), | ||
| PathBuf::from( | ||
| path.to_string_lossy().strip_prefix(TEST_DIRECTORY).unwrap(), | ||
| ), | ||
| ) | ||
| }) | ||
| .filter(|(_, relative_path)| options.check_test_file(relative_path)) | ||
| .filter(|(path, _)| options.check_pg_compat_file(path.as_path())), | ||
| .map(TestFile::new) | ||
| .filter(|f| options.check_test_file(&f.relative_path)) | ||
| .filter(|f| options.check_pg_compat_file(f.path.as_path())), | ||
| ) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
mainfunction above for Windows creates a single-threaded executor. I made it this way by mistake.It should be
tokio::runtime::Builder::new_multi_thread().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in 19f145c 👍