feat: add client side transformers to parallel loader#710
Conversation
This allows applying transformers like ImageProps, Resizer etc. by passing a list of transformer classes via the `transformers` argument to `ParallelLoader.ingest` and `ParallelQuery.query`. Fixes #326
There was a problem hiding this comment.
Pull request overview
This PR adds first-class support for applying client-side dataset transformers directly via the parallel ingestion/query APIs (ParallelLoader.ingest and ParallelQuery.query), targeting the feature request in #326.
Changes:
- Added a
transformersparameter toParallelLoader.ingest(...). - Added a
transformersparameter toParallelQuery.query(...). - Implemented transformer application by wrapping the provided generator in each transformer, in sequence.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| aperturedb/ParallelQuery.py | Extends query() with a transformers argument and wraps the generator before query execution. |
| aperturedb/ParallelLoader.py | Extends ingest() with a transformers argument and wraps the generator before delegating to query(). |
Comments suppressed due to low confidence (2)
aperturedb/ParallelQuery.py:288
- In
use_daskmode this wraps the generator withTransformerinstances, butDaskManager.run()assumesgeneratorhas.df,.filename, and.blobs_relative_to_csv(and later this method doeslen(generator.df)). SinceTransformerdoesn’t expose those attributes,query(..., transformers=...)will break for dask-backed generators. Consider applying transformers insideDaskManager.run()(wrap the per-partitiondatabefore callingloader.query) or explicitly rejectingtransformerswhenuse_daskis True with a clear error.
This issue also appears on line 279 of the same file.
use_dask = hasattr(generator, "use_dask") and generator.use_dask
if use_dask:
self._reset(batchsize=batchsize, numthreads=numthreads)
self.daskmanager = DaskManager(num_workers=numthreads)
if transformers:
for transformer in transformers:
generator = transformer(generator)
if hasattr(self, "query_setup"):
self.query_setup(generator)
if use_dask:
results, self.total_actions_time = self.daskmanager.run(
self.__class__, self.client, generator, batchsize, stats=stats)
aperturedb/ParallelQuery.py:285
query_setup()is invoked after wrapping the generator in transformers. This hides generator-specific methods likeget_indices()used byParallelLoader.query_setup()to create indexes, so index creation can silently stop working when transformers are provided. Consider callingquery_setup(generator)before applying transformers, or makeTransformerforwardget_indices/other loader hooks to the wrapped dataset.
if transformers:
for transformer in transformers:
generator = transformer(generator)
if hasattr(self, "query_setup"):
self.query_setup(generator)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Addressed all review comments (including the low confidence ones) in commit 163c99b:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
test/test_Parallel.py:138
- This equivalence test also relies on
GeneratorWithErrors(..., error_pct=0)being fully deterministic, but the generator’s current random error condition can still emit aBadCommandaterror_pct=0(randint==0), which can cause intermittent failures and mismatched succeeded counts. MakingGeneratorWithErrorsdeterministic forerror_pct=0(or seeding random here) will stabilize the comparison.
elements = 10
# Manual wrapping
generator1 = GeneratorWithErrors(elements=elements, error_pct=0)
transformer1 = DummyTransformer(generator1, client=db)
loader1 = ParallelLoader(db)
loader1.ingest(transformer1, batchsize=2, numthreads=2, stats=False)
# transformers parameter
generator2 = GeneratorWithErrors(elements=elements, error_pct=0)
loader2 = ParallelLoader(db)
loader2.ingest(generator2, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer])
- Add thread-local Connector clone in Transformer to avoid sharing across threads - Restrict attribute delegation in Transformer to an allowlist - Add validation for transformers parameter in ParallelQuery - Add test coverage for single transformer and invalid transformer inputs
Summary
Added a
transformersparameter toParallelLoader.ingestandParallelQuery.queryto allow the application of client-side transformers directly when using these APIs, resolving the feature request to support usage like:Verification
ParallelLoader.ingestandParallelQuery.querysignatures.Fixes #326