21
21
import java .time .Duration ;
22
22
import java .util .concurrent .CompletableFuture ;
23
23
import java .util .concurrent .TimeUnit ;
24
+ import java .util .function .Consumer ;
24
25
25
26
import org .reactivestreams .Subscriber ;
26
27
import org .reactivestreams .Subscription ;
@@ -94,7 +95,7 @@ public CompletableFuture<Void> whenComplete() {
94
95
public void subscribe (Subscriber <? super T > subscriber , EventExecutor executor ,
95
96
SubscriptionOption ... options ) {
96
97
upstream .subscribe (new TimeoutSubscriber <>(subscriber , executor , timeoutDuration , timeoutMode ,
97
- upstream . whenComplete () ), executor , options );
98
+ this :: abort ), executor , options );
98
99
}
99
100
100
101
@ Override
@@ -122,16 +123,16 @@ static final class TimeoutSubscriber<T> implements Runnable, Subscriber<T>, Subs
122
123
private long lastEventTimeNanos ;
123
124
private boolean completed ;
124
125
private volatile boolean canceled ;
125
- private final CompletableFuture < Void > completionFuture ;
126
+ private final Consumer < Throwable > upstreamAbort ;
126
127
127
128
TimeoutSubscriber (Subscriber <? super T > downstream , EventExecutor executor , Duration timeoutDuration ,
128
- StreamTimeoutMode timeoutMode , CompletableFuture < Void > completionFuture ) {
129
+ StreamTimeoutMode timeoutMode , Consumer < Throwable > upstreamAbort ) {
129
130
this .downstream = requireNonNull (downstream , "downstream" );
130
131
this .executor = requireNonNull (executor , "executor" );
131
132
this .timeoutDuration = requireNonNull (timeoutDuration , "timeoutDuration" );
132
133
timeoutNanos = timeoutDuration .toNanos ();
133
134
this .timeoutMode = requireNonNull (timeoutMode , "timeoutMode" );
134
- this .completionFuture = requireNonNull (completionFuture , "completionFuture " );
135
+ this .upstreamAbort = requireNonNull (upstreamAbort , "upstreamAbort " );
135
136
}
136
137
137
138
private ScheduledFuture <?> scheduleTimeout (long delay ) {
@@ -161,10 +162,8 @@ public void run() {
161
162
final StreamTimeoutException ex = new StreamTimeoutException (
162
163
String .format (TIMEOUT_MESSAGE , timeoutDuration .toMillis (), timeoutMode ));
163
164
164
- completionFuture .completeExceptionally (ex );
165
165
downstream .onError (ex );
166
- assert subscription != null ;
167
- subscription .cancel ();
166
+ upstreamAbort .accept (ex );
168
167
}
169
168
170
169
@ Override
@@ -205,7 +204,6 @@ public void onError(Throwable throwable) {
205
204
}
206
205
completed = true ;
207
206
cancelSchedule ();
208
- completionFuture .completeExceptionally (throwable );
209
207
downstream .onError (throwable );
210
208
}
211
209
@@ -216,7 +214,6 @@ public void onComplete() {
216
214
}
217
215
completed = true ;
218
216
cancelSchedule ();
219
- completionFuture .complete (null );
220
217
downstream .onComplete ();
221
218
}
222
219
@@ -231,7 +228,6 @@ public void cancel() {
231
228
canceled = true ;
232
229
cancelSchedule ();
233
230
assert subscription != null ;
234
- completionFuture .completeExceptionally (CancelledSubscriptionException .get ());
235
231
subscription .cancel ();
236
232
}
237
233
}
0 commit comments