Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

* Shared library for simplifying management of large shared objects added to Python SDK. Example use case is sharing a large TF model object across threads ([BEAM-10417](https://issues.apache.org/jira/browse/BEAM-10417)).
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* OnTimerContext should not create a new one when processing each element/timer in FnApiDoFnRunner ([BEAM-9839](https://issues.apache.org/jira/browse/BEAM-9839))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
private final DoFnInvoker<InputT, OutputT> doFnInvoker;
private final StartBundleArgumentProvider startBundleArgumentProvider;
private final ProcessBundleContextBase processContext;
private OnTimerContext onTimerContext;
private final OnTimerContext<?> onTimerContext;
private final FinishBundleArgumentProvider finishBundleArgumentProvider;

/**
Expand Down Expand Up @@ -426,6 +426,7 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
tagToSideInputSpecMap = tagToSideInputSpecMapBuilder.build();
this.splitListener = splitListener;
this.bundleFinalizer = bundleFinalizer;
this.onTimerContext = new OnTimerContext();

try {
this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
Expand Down Expand Up @@ -1242,7 +1243,6 @@ private <K> void processTimer(
String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
currentTimer = timer;
currentTimeDomain = timeDomain;
onTimerContext = new OnTimerContext<>(timer.getUserKey());
// The timerIdOrTimerFamilyId contains either a timerId from timer declaration or timerFamilyId
// from timer family declaration.
String timerId =
Expand Down Expand Up @@ -2014,11 +2014,6 @@ public WatermarkEstimator<?> watermarkEstimator() {

/** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer @OnTimer}. */
private class OnTimerContext<K> extends BaseArgumentProvider<InputT, OutputT> {
private final K key;

public OnTimerContext(K key) {
this.key = key;
}

private class Context extends DoFn<InputT, OutputT>.OnTimerContext {
private Context() {
Expand Down Expand Up @@ -2119,7 +2114,7 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {

@Override
public K key() {
return key;
return (K) currentTimer.getUserKey();
}

@Override
Expand Down