diff --git a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java index a401e3860..7724acd09 100644 --- a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java +++ b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java @@ -224,18 +224,18 @@ protected void initChannel(SocketChannel ch) { } reconnFailEmitters.forEach(emitter -> emitter.onNext(t)); }) - .doOnComplete(() -> { - LOG.warn("Resubscribing channels"); - resubscribeChannels(); - - connectionSuccessEmitters.forEach(emitter -> emitter.onNext(new Object())); - }); + .doOnComplete(() -> connectionSuccessEmitters.forEach(emitter -> emitter.onNext(new Object()))); } private void scheduleReconnect() { LOG.info("Scheduling reconnection"); webSocketChannel.eventLoop().schedule( - () -> connect().subscribe(), + () -> connect().subscribe( + () -> { + LOG.warn("Resubscribing channels"); + resubscribeChannels(); + } + ), retryDuration.toMillis(), TimeUnit.MILLISECONDS); }