fix: gc string view arrays in RepartitionExec#20500
fix: gc string view arrays in RepartitionExec#20500Samyak2 wants to merge 6 commits intoapache:mainfrom
Conversation
6a1547d to
471de13
Compare
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn hash_repartition_string_view_compaction() -> Result<()> { |
There was a problem hiding this comment.
This test does not actually exercise the regression it is meant to cover.
It only checks that repartition returns all rows. That would also pass before the gc() change.
As a result, we still do not have a test that would catch the over-counting bug if this logic regresses.
Please add an assertion that observes the compaction or accounting behavior directly. For example:
- Check that the total
get_array_memory_size()across the repartitioned outputs stays close to the original batch, instead of scaling with the number of output partitions. - Test spill behavior under a tight memory limit (e.g., spilled bytes).
- Verify
StringViewArraybuffer ownership after repartition, so outputs no longer all retain the original shared payload buffer.
There was a problem hiding this comment.
Yeah, I did try to add a test that checks for memory size specifically, but it seemed a bit fragile to assert on those numbers. Let me try the other approaches, thanks!
There was a problem hiding this comment.
I've updated the test, please take a look!
Without the fix, the mem usage blows up to 4x of the original size. With the fix, it's actually less than the original size. To have some margin for error, I have used a threshold of 2x for the mem usage assertion
| if let Some(sv) = | ||
| col.as_any().downcast_ref::<StringViewArray>() | ||
| { | ||
| Arc::new(sv.gc()) |
There was a problem hiding this comment.
The new StringViewArray::gc() pass in repartition is very similar to the existing organize_stringview_arrays logic in datafusion/physical-plan/src/sorts/sort.rs.
A small shared helper would keep the workaround in one place and reduce the chance that sort/repartition diverge when Arrow view handling changes again.
There was a problem hiding this comment.
Agreed. I'll change this
There was a problem hiding this comment.
I have made this change.
- I wasn't sure of where to put this util. I have made it
pub(crate)and kept it inphysical-plan/src/common.rs - In the case of repartition, we would be doing the
RecordBatchconstructor validation twice when string view arrays are present (when there's no string view array, we return the same batch back). But this should be okay since the actual gc time will dominate over the time for schema validation, etc.
(will fix the other comment tomorrow)
4385fb4 to
0733758
Compare
Fixes apache#20491 - Took the fix from `ExternalSorter` introduced in apache#14823 - If any `StringViewArray` columns are present in the repartitioned input, we gc them to reduce duplicate tracking of the same string view buffer. - Fixes over-counting when there's a `RepartitionExec` above a partial agg on a `StringViewArray` column.
This is not needed for round robin repartition
3fb8e02 to
979decc
Compare
|
CI is green now after rebasing on main |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Do these benchmarks have string view arrays enabled? If not, I don't see why the numbers are getting affected (although it's a small delta) |
|
For the above benchmark runs, the Parquet-backed benchmark data is expected to use view types by default. Why:
That means both of the benchmark outputs under discussion should be assumed to have string view arrays enabled for Parquet-backed string columns unless view types were explicitly disabled. |
|
Yes, they do all use string views by default. |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
| // pool to count the same data buffers multiple times, once for each | ||
| // consumer of the repartition. | ||
| // So we gc the output arrays, which creates new data buffers. | ||
| let batch = gc_stringview_arrays(batch)?; |
There was a problem hiding this comment.
I think it would be best to use the coalesce kernels (which do GC-ing already I think) before sending them to the upstream partitions.
I got some mixed performance results from that before, but I think the upcoming morsel / workstealing changes might be able to improve this (as it won't benefit from pushing the copying work over to a new (possibly idling) thread)
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤔 some of the benchmarks look like they got slower. I'll rerun to be sure |
|
run benchmark clickbench_partitioned |
| for array in batch.columns() { | ||
| if let Some(string_view_array) = array.as_any().downcast_ref::<StringViewArray>() | ||
| { | ||
| let new_array = string_view_array.gc(); |
There was a problem hiding this comment.
Calling gc here basically forces all the data to copied which is a lot of work. I am not sure it is a good idea to do so unconditionally
We had to do some pretty sophisticated heuristics of when to do a GC as part of the arrow BatchCoalescer
There was a problem hiding this comment.
Interesting. I see that here. Repartition does use the batch coalescer, but on the receiver side, not the sender side. Memory tracking happens on the sender, before coalesce.
@Dandandan mentioned above that coalescing before sending had mixed results. I can try re-doing that on latest main to see if it still has the same effect. If so, I'll try branching out from the morsel PR and try it out.
There was a problem hiding this comment.
Yes, I think in theory it should be slightly better to do the coalescing directly when batches are potentially already in cache & saving partition - 1 * wake ups (which are not used).
But it seems (my theory) pushing it upstream might create some parallelism / mitigating some skew as also for a slow partition it can do the coalescing in parallel.
There was a problem hiding this comment.
@Dandandan I see that you had tried moving the coalesce upstream in repartition here: #21550
But looks like you closed it. I see an improvement in clickbench from the benchmarks there. Why was it closed?
I noticed that the morsel PRs are now merged, so I was planning to try out moving the coalesce to the producer.
There was a problem hiding this comment.
Yeah, I tried it again, but I did still see a small regression compared to the morsel PR coalesce upstream.
Perhaps it can still extract some more parallelism in certain cases (e.g. it can still do coalescing in another task/thread and start triggering IO request again in the current task?). Or when we only have a few files left we still can use some more threads to do the coalesce in another thread.
I couldn't solve it yet but feel free to try again!
There was a problem hiding this comment.
Alright, let me try this.
But if your guess is correct, then we're blocking the next IO call from happening due to CPU compute above the data source. So keeping the coalesce downstream is just removing some compute from that path. The core issue seems to be that we're not doing pre-fetching of IO while the compute is running?
I'm not sure, but I hope to get more insights when I try this out.
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing fix-repartition-string-view-counting (979decc) to cdaecf0 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
I think we need to resolve the performance regression before merging this PR
Which issue does this PR close?
Utf8View/StringViewArray#20491.Rationale for this change
RepartitionExecabove a partial agg on aStringViewArraycolumn.ExternalSorterintroduced in Fix: External sort failing onStringViewdue to shared buffers #14823What changes are included in this PR?
StringViewArraycolumns are present in the repartitioned input, we gc them to reduce duplicate tracking of the same string view buffer.Are these changes tested?
Utf8View/StringViewArray#20491Are there any user-facing changes?
No