From fa8b8020e44804da1a772ae8b30c64463df27f25 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 Sep 2023 00:27:54 -0700 Subject: [PATCH 1/2] Propagate error from spawned task reading spills --- datafusion/core/src/physical_plan/common.rs | 6 ++++- .../core/src/physical_plan/sorts/sort.rs | 4 ++- .../sorts/sort_preserving_merge.rs | 2 ++ datafusion/core/src/physical_plan/stream.rs | 25 +++++++++++++------ datafusion/core/src/test/exec.rs | 4 +++ 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 9e94fe4e25990..6133e397dbcb9 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -109,9 +109,13 @@ pub(crate) fn spawn_buffered( builder.spawn(async move { while let Some(item) = input.next().await { if sender.send(item).await.is_err() { - return; + return Err(DataFusionError::Execution( + "Failed to send record batch".to_string(), + )); } } + + Ok(()) }); builder.build() diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 07bb4cd357295..82badb7d879c9 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -616,9 +616,11 @@ fn read_spill_as_stream( let sender = builder.tx(); builder.spawn_blocking(move || { - if let Err(e) = read_spill(sender, path.path()) { + let result = read_spill(sender, path.path()); + if let Err(e) = &result { error!("Failure while reading spill file: {:?}. Error: {}", path, e); } + result }); Ok(builder.build()) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 27c1f79db5bc3..5b12f1cdd93b2 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -848,6 +848,8 @@ mod tests { // This causes the MergeStream to wait for more input tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } + + Ok(()) }); streams.push(builder.build()); diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index 1147f288643c3..a1f63f29cbf7d 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -50,7 +50,7 @@ pub struct RecordBatchReceiverStreamBuilder { tx: Sender>, rx: Receiver>, schema: SchemaRef, - join_set: JoinSet<()>, + join_set: JoinSet>, } impl RecordBatchReceiverStreamBuilder { @@ -78,7 +78,7 @@ impl RecordBatchReceiverStreamBuilder { /// retrieved from `Self::tx` pub fn spawn(&mut self, task: F) where - F: Future, + F: Future>, F: Send + 'static, { self.join_set.spawn(task); @@ -91,7 +91,7 @@ impl RecordBatchReceiverStreamBuilder { /// retrieved from `Self::tx` pub fn spawn_blocking(&mut self, f: F) where - F: FnOnce(), + F: FnOnce() -> Result<()>, F: Send + 'static, { self.join_set.spawn_blocking(f); @@ -120,7 +120,7 @@ impl RecordBatchReceiverStreamBuilder { "Stopping execution: error executing input: {}", displayable(input.as_ref()).one_line() ); - return; + return Ok(()); } Ok(stream) => stream, }; @@ -137,7 +137,7 @@ impl RecordBatchReceiverStreamBuilder { "Stopping execution: output is gone, plan cancelling: {}", displayable(input.as_ref()).one_line() ); - return; + return Ok(()); } // stop after the first error is encontered (don't @@ -147,9 +147,11 @@ impl RecordBatchReceiverStreamBuilder { "Stopping execution: plan returned error: {}", displayable(input.as_ref()).one_line() ); - return; + return Ok(()); } } + + Ok(()) }); } @@ -169,7 +171,16 @@ impl RecordBatchReceiverStreamBuilder { let check = async move { while let Some(result) = join_set.join_next().await { match result { - Ok(()) => continue, // nothing to report + Ok(task_result) => { + match task_result { + // nothing to report + Ok(_) => continue, + // This means a blocking task error + Err(e) => { + return Some(internal_err!("Spawned Task error: {e}")); + } + } + } // This means a tokio task error, likely a panic Err(e) => { if e.is_panic() { diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index 682f31a7fe9a4..44ce5cf3282b1 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -228,6 +228,8 @@ impl ExecutionPlan for MockExec { println!("ERROR batch via delayed stream: {e}"); } } + + Ok(()) }); // returned stream simply reads off the rx stream Ok(builder.build()) @@ -364,6 +366,8 @@ impl ExecutionPlan for BarrierExec { println!("ERROR batch via barrier stream stream: {e}"); } } + + Ok(()) }); // returned stream simply reads off the rx stream From 9169892cf7b0e960080133b722255fd94551886c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 8 Sep 2023 11:15:33 -0700 Subject: [PATCH 2/2] For review --- datafusion/core/src/physical_plan/common.rs | 6 +++--- datafusion/core/src/physical_plan/stream.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 6133e397dbcb9..787f3eed2673e 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -109,9 +109,9 @@ pub(crate) fn spawn_buffered( builder.spawn(async move { while let Some(item) = input.next().await { if sender.send(item).await.is_err() { - return Err(DataFusionError::Execution( - "Failed to send record batch".to_string(), - )); + // receiver dropped when query is shutdown early (e.g., limit) or error, + // no need to return propagate the send error. + return Ok(()); } } diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index a1f63f29cbf7d..69869e35a39aa 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -24,9 +24,9 @@ use std::task::Poll; use crate::physical_plan::displayable; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use datafusion_common::internal_err; use datafusion_common::DataFusionError; use datafusion_common::Result; +use datafusion_common::{exec_err, internal_err}; use datafusion_execution::TaskContext; use futures::stream::BoxStream; use futures::{Future, Stream, StreamExt}; @@ -177,7 +177,7 @@ impl RecordBatchReceiverStreamBuilder { Ok(_) => continue, // This means a blocking task error Err(e) => { - return Some(internal_err!("Spawned Task error: {e}")); + return Some(exec_err!("Spawned Task error: {e}")); } } }