diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 1bca31a9..714c9dae 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -123,14 +123,14 @@ private void initLifecycle() { start .asMono() .then(doStart()) - .doOnSuccess(c -> onStart.tryEmitEmpty()) - .doOnError(onStart::tryEmitError) + .doOnSuccess(c -> onStart.emitEmpty(RetryEmitFailureHandler.INSTANCE)) + .doOnError(th -> onStart.emitError(th, RetryEmitFailureHandler.INSTANCE)) .subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th)); shutdown .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.tryEmitEmpty()) + .doFinally(s -> onShutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE)) .subscribe( null, th -> @@ -236,7 +236,7 @@ public ClusterImpl handler(Function handler) { public Mono start() { return Mono.defer( () -> { - start.tryEmitEmpty(); + start.emitEmpty(RetryEmitFailureHandler.INSTANCE); return onStart.asMono().thenReturn(this); }); } @@ -489,7 +489,7 @@ public Mono updateMetadata(T metadata) { @Override public void shutdown() { - shutdown.tryEmitEmpty(); + shutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE); } private Mono doShutdown() { diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 99e0f272..32eb74b9 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -90,7 +90,7 @@ private void init(DisposableServer server) { // Setup cleanup stop.asMono() .then(doStop()) - .doFinally(s -> onStop.tryEmitEmpty()) + .doFinally(s -> onStop.emitEmpty(RetryEmitFailureHandler.INSTANCE)) .subscribe( null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString())); } @@ -181,7 +181,7 @@ public boolean isStopped() { public final Mono stop() { return Mono.defer( () -> { - stop.tryEmitEmpty(); + stop.emitEmpty(RetryEmitFailureHandler.INSTANCE); return onStop.asMono(); }); }