Skip to content

Commit

Permalink
Added RetryEmitFailureHandler (#358)
Browse files Browse the repository at this point in the history
* Added RetryEmitFailureHandler
  • Loading branch information
artem-v authored Apr 12, 2021
1 parent 13c89ac commit 32aeafb
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 20 deletions.
25 changes: 22 additions & 3 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.cluster;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipConfig;
Expand Down Expand Up @@ -45,7 +47,10 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -283,7 +288,8 @@ private Mono<Cluster> doStart0() {
failureDetector,
gossip,
metadataStore,
config,
config.membershipConfig(),
config.failureDetectorConfig(),
scheduler,
cidGenerator,
monitorModelBuilder);
Expand All @@ -293,8 +299,11 @@ private Mono<Cluster> doStart0() {
membership
.listen()
/*.publishOn(scheduler)*/
// Dont uncomment, already beign executed inside sc-cluster thread
.subscribe(sink::tryEmitNext, this::onError, sink::tryEmitComplete));
// Dont uncomment, already beign executed inside scalecube-cluster thread
.subscribe(
event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE),
th -> LOGGER.error("[{}][membership][error] cause:", localMember, th),
() -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE)));

return Mono.fromRunnable(() -> failureDetector.start())
.then(Mono.fromRunnable(() -> gossip.start()))
Expand Down Expand Up @@ -580,4 +589,14 @@ private Message enhanceWithSender(Message message) {
return Message.with(message).sender(address).build();
}
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.cluster.fdetector;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.PingData.AckType;
Expand All @@ -21,7 +23,10 @@
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.scheduler.Scheduler;

public final class FailureDetectorImpl implements FailureDetector {
Expand Down Expand Up @@ -106,7 +111,7 @@ public void stop() {
actionsDisposables.dispose();

// Stop publishing events
sink.tryEmitComplete();
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
}

@Override
Expand Down Expand Up @@ -371,7 +376,7 @@ private List<Member> selectPingReqMembers(Member pingMember) {

private void publishPingResult(long period, Member member, MemberStatus status) {
LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status);
sink.tryEmitNext(new FailureDetectorEvent(member, status));
sink.emitNext(new FailureDetectorEvent(member, status), RetryEmitFailureHandler.INSTANCE);
}

private MemberStatus computeMemberStatus(Message message, long period) {
Expand Down Expand Up @@ -419,4 +424,14 @@ private boolean isTransitPingAck(Message message) {
Transport getTransport() {
return transport;
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.cluster.gossip;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
Expand All @@ -24,7 +26,10 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.scheduler.Scheduler;

public final class GossipProtocolImpl implements GossipProtocol {
Expand Down Expand Up @@ -113,7 +118,7 @@ public void stop() {
actionsDisposables.dispose();

// Stop publishing events
sink.tryEmitComplete();
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
}

@Override
Expand Down Expand Up @@ -201,7 +206,7 @@ private void onGossipReq(Message message) {
if (gossipState == null) { // new gossip
gossipState = new GossipState(gossip, period);
gossips.put(gossip.gossipId(), gossipState);
sink.tryEmitNext(gossip.message());
sink.emitNext(gossip.message(), RetryEmitFailureHandler.INSTANCE);
}
gossipState.addToInfected(gossipRequest.from());
}
Expand Down Expand Up @@ -378,4 +383,14 @@ Transport getTransport() {
Member getMember() {
return localMember;
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static io.scalecube.cluster.membership.MemberStatus.ALIVE;
import static io.scalecube.cluster.membership.MemberStatus.DEAD;
import static io.scalecube.cluster.membership.MemberStatus.LEAVING;
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
Expand Down Expand Up @@ -47,7 +47,10 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.scheduler.Scheduler;

public final class MembershipProtocolImpl implements MembershipProtocol {
Expand Down Expand Up @@ -108,7 +111,8 @@ private enum MembershipUpdateReason {
* @param failureDetector failure detector
* @param gossipProtocol gossip protocol
* @param metadataStore metadata store
* @param config cluster config parameters
* @param membershipConfig membershipConfig
* @param failureDetectorConfig failureDetectorConfig
* @param scheduler scheduler
* @param cidGenerator correlation id generator
* @param monitorModelBuilder monitor model builder
Expand All @@ -119,7 +123,8 @@ public MembershipProtocolImpl(
FailureDetector failureDetector,
GossipProtocol gossipProtocol,
MetadataStore metadataStore,
ClusterConfig config,
MembershipConfig membershipConfig,
FailureDetectorConfig failureDetectorConfig,
Scheduler scheduler,
CorrelationIdGenerator cidGenerator,
ClusterMonitorModel.Builder monitorModelBuilder) {
Expand All @@ -132,8 +137,8 @@ public MembershipProtocolImpl(
this.scheduler = Objects.requireNonNull(scheduler);
this.cidGenerator = Objects.requireNonNull(cidGenerator);
this.monitorModelBuilder = Objects.requireNonNull(monitorModelBuilder);
this.membershipConfig = Objects.requireNonNull(config).membershipConfig();
this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig();
this.membershipConfig = Objects.requireNonNull(membershipConfig);
this.failureDetectorConfig = Objects.requireNonNull(failureDetectorConfig);

// Prepare seeds
seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers());
Expand Down Expand Up @@ -302,7 +307,7 @@ public void stop() {
suspicionTimeoutTasks.clear();

// Stop publishing events
sink.tryEmitComplete();
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
}

@Override
Expand Down Expand Up @@ -730,7 +735,7 @@ private Mono<Void> onLeavingDetected(MembershipRecord r0, MembershipRecord r1) {

private void publishEvent(MembershipEvent event) {
LOGGER.info("[{}][publishEvent] {}", localMember, event);
sink.tryEmitNext(event);
sink.emitNext(event, RetryEmitFailureHandler.INSTANCE);
}

private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {
Expand Down Expand Up @@ -937,4 +942,14 @@ private void onMemberRemoved(MembershipEvent event) {
removedMembersHistory.remove(0);
}
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.scalecube.cluster.BaseTest;
import io.scalecube.cluster.ClusterConfig;
Expand Down Expand Up @@ -34,7 +35,10 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
Expand Down Expand Up @@ -1158,7 +1162,8 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf
failureDetector,
gossipProtocol,
metadataStore,
config,
config.membershipConfig(),
config.failureDetectorConfig(),
scheduler,
cidGenerator,
new ClusterMonitorModel.Builder());
Expand Down Expand Up @@ -1285,7 +1290,20 @@ private Sinks.Many<MembershipEvent> startRecordingRemoved(MembershipProtocolImpl
membership
.listen()
.filter(MembershipEvent::isRemoved)
.subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete);
.subscribe(
event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE),
error -> sink.emitError(error, RetryEmitFailureHandler.INSTANCE),
() -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE));
return sink;
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.transport.netty;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
Expand Down Expand Up @@ -28,7 +30,10 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
Expand All @@ -39,7 +44,7 @@ public final class TransportImpl implements Transport {

private final MessageCodec messageCodec;

// Sink
// Subject
private final Sinks.Many<Message> sink = Sinks.many().multicast().directBestEffort();

// Close handler
Expand Down Expand Up @@ -156,7 +161,10 @@ public Mono<Transport> start() {
context ->
context.put(
ReceiverContext.class,
new ReceiverContext(loopResources, this::toMessage, sink::tryEmitNext)));
new ReceiverContext(
loopResources,
this::toMessage,
message -> sink.emitNext(message, RetryEmitFailureHandler.INSTANCE))));
}

@Override
Expand All @@ -183,7 +191,7 @@ private Mono<Void> doStop() {
() -> {
LOGGER.info("[{}][doStop] Stopping", address);
// Complete incoming messages observable
sink.tryEmitComplete();
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
return Flux.concatDelayError(closeServer(), shutdownLoopResources())
.then()
.doFinally(s -> connections.clear())
Expand Down Expand Up @@ -343,4 +351,14 @@ public Function<Message, ByteBuf> messageEncoder() {
return messageEncoder;
}
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}

0 comments on commit 32aeafb

Please sign in to comment.