diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 108e42e7be429..433dda870defa 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -468,35 +468,41 @@ pub fn can_interleave>>( } fn union_schema(inputs: &[Arc]) -> SchemaRef { - let fields: Vec = (0..inputs[0].schema().fields().len()) + let first_schema = inputs[0].schema(); + + let fields = (0..first_schema.fields().len()) .map(|i| { inputs .iter() - .filter_map(|input| { - if input.schema().fields().len() > i { - let field = input.schema().field(i).clone(); - let right_hand_metdata = inputs - .get(1) - .map(|right_input| { - right_input.schema().field(i).metadata().clone() - }) - .unwrap_or_default(); - let mut metadata = field.metadata().clone(); - metadata.extend(right_hand_metdata); - Some(field.with_metadata(metadata)) - } else { - None - } + .enumerate() + .map(|(input_idx, input)| { + let field = input.schema().field(i).clone(); + let mut metadata = field.metadata().clone(); + + let other_metadatas = inputs + .iter() + .enumerate() + .filter(|(other_idx, _)| *other_idx != input_idx) + .flat_map(|(_, other_input)| { + other_input.schema().field(i).metadata().clone().into_iter() + }); + + metadata.extend(other_metadatas); + field.with_metadata(metadata) }) - .find_or_first(|f| f.is_nullable()) + .find_or_first(Field::is_nullable) + // We can unwrap this because if inputs was empty, this would've already panic'ed when we + // indexed into inputs[0]. .unwrap() }) + .collect::>(); + + let all_metadata_merged = inputs + .iter() + .flat_map(|i| i.schema().metadata().clone().into_iter()) .collect(); - Arc::new(Schema::new_with_metadata( - fields, - inputs[0].schema().metadata().clone(), - )) + Arc::new(Schema::new_with_metadata(fields, all_metadata_merged)) } /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 2143b3089ee5a..deeacb1b88190 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -319,17 +319,27 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { String::from("metadata_key"), String::from("the l_name field"), )])); + let ts = Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false) .with_metadata(HashMap::from([( String::from("metadata_key"), String::from("ts non-nullable field"), )])); - let schema = - Schema::new(vec![id, name, l_name, ts]).with_metadata(HashMap::from([( + let nonnull_name = + Field::new("nonnull_name", DataType::Utf8, false).with_metadata(HashMap::from([ + ( + String::from("metadata_key"), + String::from("the nonnull_name field"), + ), + ])); + + let schema = Schema::new(vec![id, name, l_name, ts, nonnull_name]).with_metadata( + HashMap::from([( String::from("metadata_key"), String::from("the entire schema"), - )])); + )]), + ); let batch = RecordBatch::try_new( Arc::new(schema), @@ -342,6 +352,11 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { 1599572549190855123, 1599572549190855123, ])) as _, + Arc::new(StringArray::from(vec![ + Some("no_foo"), + Some("no_bar"), + Some("no_baz"), + ])) as _, ], ) .unwrap(); diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 588a36e3d515a..8f787254c0967 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -123,7 +123,34 @@ ORDER BY id, name, l_name; NULL bar NULL NULL NULL l_bar +# Regression test: missing field metadata from left side of the union when right side is chosen +query T +select name from ( + SELECT nonnull_name as name FROM "table_with_metadata" + UNION ALL + SELECT NULL::string as name +) group by name order by name; +---- +no_bar +no_baz +no_foo +NULL +# Regression test: missing schema metadata from union when schema with metadata isn't the first one +# and also ensure it works fine with multiple unions +query T +select name from ( + SELECT NULL::string as name + UNION ALL + SELECT nonnull_name as name FROM "table_with_metadata" + UNION ALL + SELECT NULL::string as name +) group by name order by name; +---- +no_bar +no_baz +no_foo +NULL query P rowsort SELECT ts