Skip to content

Commit 1b01b52

Browse files
authored
[FLINK-38364][streaming-java] Implement async state version of ProcessingTimeoutTrigger (#27027)
1 parent a34faae commit 1b01b52

File tree

9 files changed

+954
-175
lines changed

9 files changed

+954
-175
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void onMerge(W window, OnMergeContext ctx) throws Exception {
9797

9898
@Override
9999
public String toString() {
100-
return "CountTrigger(" + maxCount + ")";
100+
return "AsyncCountTrigger(" + maxCount + ")";
101101
}
102102

103103
/**
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.runtime.operators.windowing;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
23+
import org.apache.flink.api.common.state.State;
24+
import org.apache.flink.api.common.state.StateDescriptor;
25+
import org.apache.flink.api.common.state.v2.StateFuture;
26+
import org.apache.flink.core.state.StateFutureUtils;
27+
import org.apache.flink.metrics.MetricGroup;
28+
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
29+
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
30+
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
31+
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
32+
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
33+
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
34+
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
35+
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
36+
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
37+
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
38+
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
39+
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
40+
import org.apache.flink.streaming.api.windowing.windows.Window;
41+
42+
import javax.annotation.Nonnull;
43+
44+
/**
45+
* A converter from {@code Trigger} to {@code AsyncTrigger}.
46+
*
47+
* <p>Basic triggers (e.g., {@code CountTrigger}) are directly converted to their async version.
48+
*
49+
* <p>Async-support triggers which implement {@code AsyncTriggerConvertable} (e.g., {@code
50+
* ProcessingTimeoutTrigger}) will use self-defined async version.
51+
*
52+
* <p>Other triggers are wrapped as an {@code AsyncTrigger}, whose internal functions are executed
53+
* in sync mode.
54+
*/
55+
@Internal
56+
public interface AsyncTriggerConverter {
57+
58+
/**
59+
* Convert to an {@code AsyncTrigger}. The default implementation is only a wrapper of the
60+
* trigger, whose behaviours are all sync.
61+
*
62+
* <p>TODO: Return {@code AsyncTrigger} if {@code AsyncTrigger} becomes @PublicEvolving.
63+
*
64+
* @return The {@code AsyncTrigger} for async state processing.
65+
*/
66+
@Nonnull
67+
default Object convertToAsync() {
68+
return UserDefinedAsyncTrigger.of((Trigger<?, ?>) AsyncTriggerConverter.this);
69+
}
70+
71+
@SuppressWarnings("unchecked")
72+
static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(Trigger<T, W> trigger) {
73+
if (trigger instanceof CountTrigger) {
74+
return (AsyncTrigger<T, W>)
75+
AsyncCountTrigger.of(((CountTrigger<?>) trigger).getMaxCount());
76+
} else if (trigger instanceof EventTimeTrigger) {
77+
return (AsyncTrigger<T, W>) AsyncEventTimeTrigger.create();
78+
} else if (trigger instanceof ProcessingTimeTrigger) {
79+
return (AsyncTrigger<T, W>) AsyncProcessingTimeTrigger.create();
80+
} else if (trigger instanceof PurgingTrigger) {
81+
return (AsyncTrigger<T, W>)
82+
AsyncPurgingTrigger.of(
83+
convertToAsync(((PurgingTrigger<?, ?>) trigger).getNestedTrigger()));
84+
} else if (trigger instanceof AsyncTriggerConverter) {
85+
return (AsyncTrigger<T, W>) ((AsyncTriggerConverter) trigger).convertToAsync();
86+
} else {
87+
return UserDefinedAsyncTrigger.of(trigger);
88+
}
89+
}
90+
91+
/** Convert non-support user-defined trigger to {@code AsyncTrigger}. */
92+
class UserDefinedAsyncTrigger<T, W extends Window> extends AsyncTrigger<T, W> {
93+
private final Trigger<T, W> userDefinedTrigger;
94+
95+
private UserDefinedAsyncTrigger(Trigger<T, W> userDefinedTrigger) {
96+
this.userDefinedTrigger = userDefinedTrigger;
97+
}
98+
99+
@Override
100+
public StateFuture<TriggerResult> onElement(
101+
T element, long timestamp, W window, TriggerContext ctx) throws Exception {
102+
return StateFutureUtils.completedFuture(
103+
userDefinedTrigger.onElement(
104+
element, timestamp, window, AsyncTriggerContextConvertor.of(ctx)));
105+
}
106+
107+
@Override
108+
public StateFuture<TriggerResult> onProcessingTime(long time, W window, TriggerContext ctx)
109+
throws Exception {
110+
return StateFutureUtils.completedFuture(
111+
userDefinedTrigger.onProcessingTime(
112+
time, window, AsyncTriggerContextConvertor.of(ctx)));
113+
}
114+
115+
@Override
116+
public StateFuture<TriggerResult> onEventTime(long time, W window, TriggerContext ctx)
117+
throws Exception {
118+
return StateFutureUtils.completedFuture(
119+
userDefinedTrigger.onEventTime(
120+
time, window, AsyncTriggerContextConvertor.of(ctx)));
121+
}
122+
123+
@Override
124+
public StateFuture<Void> clear(W window, TriggerContext ctx) throws Exception {
125+
userDefinedTrigger.clear(window, AsyncTriggerContextConvertor.of(ctx));
126+
return StateFutureUtils.completedVoidFuture();
127+
}
128+
129+
@Override
130+
public boolean isEndOfStreamTrigger() {
131+
return userDefinedTrigger instanceof GlobalWindows.EndOfStreamTrigger;
132+
}
133+
134+
public static <T, W extends Window> AsyncTrigger<T, W> of(
135+
Trigger<T, W> userDefinedTrigger) {
136+
return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
137+
}
138+
139+
/**
140+
* A converter from {@link AsyncTrigger.TriggerContext} to {@link Trigger.TriggerContext}.
141+
*/
142+
private static class AsyncTriggerContextConvertor implements Trigger.TriggerContext {
143+
144+
private final AsyncTrigger.TriggerContext asyncTriggerContext;
145+
146+
private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext asyncTriggerContext) {
147+
this.asyncTriggerContext = asyncTriggerContext;
148+
}
149+
150+
@Override
151+
public long getCurrentProcessingTime() {
152+
return asyncTriggerContext.getCurrentProcessingTime();
153+
}
154+
155+
@Override
156+
public MetricGroup getMetricGroup() {
157+
return asyncTriggerContext.getMetricGroup();
158+
}
159+
160+
@Override
161+
public long getCurrentWatermark() {
162+
return asyncTriggerContext.getCurrentWatermark();
163+
}
164+
165+
@Override
166+
public void registerProcessingTimeTimer(long time) {
167+
asyncTriggerContext.registerProcessingTimeTimer(time);
168+
}
169+
170+
@Override
171+
public void registerEventTimeTimer(long time) {
172+
asyncTriggerContext.registerEventTimeTimer(time);
173+
}
174+
175+
@Override
176+
public void deleteProcessingTimeTimer(long time) {
177+
asyncTriggerContext.deleteProcessingTimeTimer(time);
178+
}
179+
180+
@Override
181+
public void deleteEventTimeTimer(long time) {
182+
asyncTriggerContext.deleteEventTimeTimer(time);
183+
}
184+
185+
@Override
186+
public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
187+
throw new UnsupportedOperationException(
188+
"Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs.");
189+
}
190+
191+
public static Trigger.TriggerContext of(
192+
AsyncTrigger.TriggerContext asyncTriggerContext) {
193+
return new AsyncTriggerContextConvertor(asyncTriggerContext);
194+
}
195+
}
196+
197+
@VisibleForTesting
198+
public Trigger<T, W> getUserDefinedTrigger() {
199+
return userDefinedTrigger;
200+
}
201+
}
202+
}

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java

Lines changed: 0 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,11 @@
2828
import org.apache.flink.api.common.state.AppendingState;
2929
import org.apache.flink.api.common.state.ListStateDescriptor;
3030
import org.apache.flink.api.common.state.ReducingStateDescriptor;
31-
import org.apache.flink.api.common.state.State;
3231
import org.apache.flink.api.common.state.StateDescriptor;
33-
import org.apache.flink.api.common.state.v2.StateFuture;
3432
import org.apache.flink.api.common.state.v2.StateIterator;
3533
import org.apache.flink.api.common.typeinfo.TypeInformation;
3634
import org.apache.flink.api.common.typeutils.TypeSerializer;
3735
import org.apache.flink.api.java.functions.KeySelector;
38-
import org.apache.flink.core.state.StateFutureUtils;
39-
import org.apache.flink.metrics.MetricGroup;
4036
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncEvictingWindowOperator;
4137
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
4238
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAggregateProcessAsyncWindowFunction;
@@ -45,28 +41,17 @@
4541
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableProcessAsyncWindowFunction;
4642
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueAsyncWindowFunction;
4743
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueProcessAsyncWindowFunction;
48-
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
49-
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
50-
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
51-
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
5244
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
5345
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
5446
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
5547
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
5648
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
5749
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
5850
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
59-
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.EndOfStreamTrigger;
6051
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
6152
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
6253
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
63-
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
64-
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
65-
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
66-
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
6754
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
68-
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
69-
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
7055
import org.apache.flink.streaming.api.windowing.windows.Window;
7156
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
7257
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -570,130 +555,4 @@ public String generateOperatorDescription(Function function1, @Nullable Function
570555
public long getAllowedLateness() {
571556
return allowedLateness;
572557
}
573-
574-
private static class UserDefinedAsyncTrigger<T, W extends Window> extends AsyncTrigger<T, W> {
575-
private final Trigger<T, W> userDefinedTrigger;
576-
577-
private UserDefinedAsyncTrigger(Trigger<T, W> userDefinedTrigger) {
578-
this.userDefinedTrigger = userDefinedTrigger;
579-
}
580-
581-
@Override
582-
public StateFuture<TriggerResult> onElement(
583-
T element, long timestamp, W window, TriggerContext ctx) throws Exception {
584-
return StateFutureUtils.completedFuture(
585-
userDefinedTrigger.onElement(
586-
element, timestamp, window, AsyncTriggerContextConvertor.of(ctx)));
587-
}
588-
589-
@Override
590-
public StateFuture<TriggerResult> onProcessingTime(long time, W window, TriggerContext ctx)
591-
throws Exception {
592-
return StateFutureUtils.completedFuture(
593-
userDefinedTrigger.onProcessingTime(
594-
time, window, AsyncTriggerContextConvertor.of(ctx)));
595-
}
596-
597-
@Override
598-
public StateFuture<TriggerResult> onEventTime(long time, W window, TriggerContext ctx)
599-
throws Exception {
600-
return StateFutureUtils.completedFuture(
601-
userDefinedTrigger.onEventTime(
602-
time, window, AsyncTriggerContextConvertor.of(ctx)));
603-
}
604-
605-
@Override
606-
public StateFuture<Void> clear(W window, TriggerContext ctx) throws Exception {
607-
userDefinedTrigger.clear(window, AsyncTriggerContextConvertor.of(ctx));
608-
return StateFutureUtils.completedVoidFuture();
609-
}
610-
611-
@Override
612-
public boolean isEndOfStreamTrigger() {
613-
return userDefinedTrigger instanceof EndOfStreamTrigger;
614-
}
615-
616-
public static <T, W extends Window> AsyncTrigger<T, W> of(
617-
Trigger<T, W> userDefinedTrigger) {
618-
return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
619-
}
620-
}
621-
622-
private static class AsyncTriggerConverter {
623-
624-
@SuppressWarnings("unchecked")
625-
public static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(
626-
Trigger<T, W> trigger) {
627-
if (trigger instanceof CountTrigger) {
628-
return (AsyncTrigger<T, W>)
629-
AsyncCountTrigger.of(((CountTrigger<?>) trigger).getMaxCount());
630-
} else if (trigger instanceof EventTimeTrigger) {
631-
return (AsyncTrigger<T, W>) AsyncEventTimeTrigger.create();
632-
} else if (trigger instanceof ProcessingTimeTrigger) {
633-
return (AsyncTrigger<T, W>) AsyncProcessingTimeTrigger.create();
634-
} else if (trigger instanceof PurgingTrigger) {
635-
return (AsyncTrigger<T, W>)
636-
AsyncPurgingTrigger.of(
637-
convertToAsync(
638-
((PurgingTrigger<?, ?>) trigger).getNestedTrigger()));
639-
} else {
640-
return UserDefinedAsyncTrigger.of(trigger);
641-
}
642-
}
643-
}
644-
645-
/** A converter from {@link AsyncTrigger.TriggerContext} to {@link Trigger.TriggerContext}. */
646-
private static class AsyncTriggerContextConvertor implements TriggerContext {
647-
648-
private final AsyncTrigger.TriggerContext asyncTriggerContext;
649-
650-
private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext asyncTriggerContext) {
651-
this.asyncTriggerContext = asyncTriggerContext;
652-
}
653-
654-
@Override
655-
public long getCurrentProcessingTime() {
656-
return asyncTriggerContext.getCurrentProcessingTime();
657-
}
658-
659-
@Override
660-
public MetricGroup getMetricGroup() {
661-
return asyncTriggerContext.getMetricGroup();
662-
}
663-
664-
@Override
665-
public long getCurrentWatermark() {
666-
return asyncTriggerContext.getCurrentWatermark();
667-
}
668-
669-
@Override
670-
public void registerProcessingTimeTimer(long time) {
671-
asyncTriggerContext.registerProcessingTimeTimer(time);
672-
}
673-
674-
@Override
675-
public void registerEventTimeTimer(long time) {
676-
asyncTriggerContext.registerEventTimeTimer(time);
677-
}
678-
679-
@Override
680-
public void deleteProcessingTimeTimer(long time) {
681-
asyncTriggerContext.deleteProcessingTimeTimer(time);
682-
}
683-
684-
@Override
685-
public void deleteEventTimeTimer(long time) {
686-
asyncTriggerContext.deleteEventTimeTimer(time);
687-
}
688-
689-
@Override
690-
public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
691-
throw new UnsupportedOperationException(
692-
"Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs.");
693-
}
694-
695-
public static TriggerContext of(AsyncTrigger.TriggerContext asyncTriggerContext) {
696-
return new AsyncTriggerContextConvertor(asyncTriggerContext);
697-
}
698-
}
699558
}

0 commit comments

Comments
 (0)