Skip to content

Commit

Permalink
Fix deprecations after reactor upgrade (#355)
Browse files Browse the repository at this point in the history
* Minor fixes at TcpReceiver and TcpChannelInitializer
* Fixed deprecations
  • Loading branch information
artem-v authored Mar 23, 2021
1 parent 2dd5f98 commit 426f343
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 173 deletions.
43 changes: 21 additions & 22 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -78,18 +76,17 @@ public final class ClusterImpl implements Cluster {
private Function<Cluster, ? extends ClusterMessageHandler> handler =
cluster -> new ClusterMessageHandler() {};

// Subject
private final DirectProcessor<MembershipEvent> membershipEvents = DirectProcessor.create();
private final FluxSink<MembershipEvent> membershipSink = membershipEvents.sink();
// Sink
private final Sinks.Many<MembershipEvent> sink = Sinks.many().multicast().directBestEffort();

// Disposables
private final Disposable.Composite actionsDisposables = Disposables.composite();

// Lifecycle
private final MonoProcessor<Void> start = MonoProcessor.create();
private final MonoProcessor<Void> onStart = MonoProcessor.create();
private final MonoProcessor<Void> shutdown = MonoProcessor.create();
private final MonoProcessor<Void> onShutdown = MonoProcessor.create();
private final Sinks.One<Void> start = Sinks.one();
private final Sinks.One<Void> onStart = Sinks.one();
private final Sinks.One<Void> shutdown = Sinks.one();
private final Sinks.One<Void> onShutdown = Sinks.one();

// Cluster components
private Transport transport;
Expand Down Expand Up @@ -119,14 +116,16 @@ private ClusterImpl(ClusterImpl that) {

private void initLifecycle() {
start
.asMono()
.then(doStart())
.doOnSuccess(avoid -> onStart.onComplete())
.doOnError(onStart::onError)
.doOnSuccess(c -> onStart.tryEmitEmpty())
.doOnError(onStart::tryEmitError)
.subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th));

shutdown
.asMono()
.then(doShutdown())
.doFinally(s -> onShutdown.onComplete())
.doFinally(s -> onShutdown.tryEmitEmpty())
.subscribe(
null,
th ->
Expand Down Expand Up @@ -232,8 +231,8 @@ public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> handler) {
public Mono<Cluster> start() {
return Mono.defer(
() -> {
start.onComplete();
return onStart.thenReturn(this);
start.tryEmitEmpty();
return onStart.asMono().thenReturn(this);
});
}

Expand All @@ -260,7 +259,7 @@ private Mono<Cluster> doStart0() {
new FailureDetectorImpl(
localMember,
transport,
membershipEvents.onBackpressureBuffer(),
sink.asFlux().onBackpressureBuffer(),
config.failureDetectorConfig(),
scheduler,
cidGenerator);
Expand All @@ -269,7 +268,7 @@ private Mono<Cluster> doStart0() {
new GossipProtocolImpl(
localMember,
transport,
membershipEvents.onBackpressureBuffer(),
sink.asFlux().onBackpressureBuffer(),
config.gossipConfig(),
scheduler);

Expand All @@ -295,7 +294,7 @@ private Mono<Cluster> doStart0() {
.listen()
/*.publishOn(scheduler)*/
// Dont uncomment, already beign executed inside sc-cluster thread
.subscribe(membershipSink::next, this::onError, membershipSink::complete));
.subscribe(sink::tryEmitNext, this::onError, sink::tryEmitComplete));

return Mono.fromRunnable(() -> failureDetector.start())
.then(Mono.fromRunnable(() -> gossip.start()))
Expand Down Expand Up @@ -373,7 +372,7 @@ private Flux<Message> listenGossip() {

private Flux<MembershipEvent> listenMembership() {
// listen on live stream
return membershipEvents.onBackpressureBuffer();
return sink.asFlux().onBackpressureBuffer();
}

/**
Expand Down Expand Up @@ -481,7 +480,7 @@ public <T> Mono<Void> updateMetadata(T metadata) {

@Override
public void shutdown() {
shutdown.onComplete();
shutdown.tryEmitEmpty();
}

private Mono<Void> doShutdown() {
Expand Down Expand Up @@ -524,12 +523,12 @@ private Mono<Void> dispose() {

@Override
public Mono<Void> onShutdown() {
return onShutdown;
return onShutdown.asMono();
}

@Override
public boolean isShutdown() {
return onShutdown.isDisposed();
return onShutdown.asMono().toFuture().isDone();
}

private static class SenderAwareTransport implements Transport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

public final class FailureDetectorImpl implements FailureDetector {
Expand All @@ -46,18 +44,15 @@ public final class FailureDetectorImpl implements FailureDetector {
// State

private long currentPeriod = 0;
private List<Member> pingMembers = new ArrayList<>();
private int pingMemberIndex = 0; // index for sequential ping member selection
private final List<Member> pingMembers = new ArrayList<>();

// Disposables

private final Disposable.Composite actionsDisposables = Disposables.composite();

// Subject
private final FluxProcessor<FailureDetectorEvent, FailureDetectorEvent> subject =
DirectProcessor.<FailureDetectorEvent>create().serialize();

private final FluxSink<FailureDetectorEvent> sink = subject.sink();
// Sink
private final Sinks.Many<FailureDetectorEvent> sink = Sinks.many().multicast().directBestEffort();

// Scheduled
private final Scheduler scheduler;
Expand Down Expand Up @@ -111,12 +106,12 @@ public void stop() {
actionsDisposables.dispose();

// Stop publishing events
sink.complete();
sink.tryEmitComplete();
}

@Override
public Flux<FailureDetectorEvent> listen() {
return subject.onBackpressureBuffer();
return sink.asFlux().onBackpressureBuffer();
}

// ================================================
Expand Down Expand Up @@ -376,7 +371,7 @@ private List<Member> selectPingReqMembers(Member pingMember) {

private void publishPingResult(long period, Member member, MemberStatus status) {
LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status);
sink.next(new FailureDetectorEvent(member, status));
sink.tryEmitNext(new FailureDetectorEvent(member, status));
}

private MemberStatus computeMemberStatus(Message message, long period) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

public final class GossipProtocolImpl implements GossipProtocol {
Expand Down Expand Up @@ -58,12 +56,8 @@ public final class GossipProtocolImpl implements GossipProtocol {

private final Disposable.Composite actionsDisposables = Disposables.composite();

// Subject

private final FluxProcessor<Message, Message> subject =
DirectProcessor.<Message>create().serialize();

private final FluxSink<Message> sink = subject.sink();
// Sink
private final Sinks.Many<Message> sink = Sinks.many().multicast().directBestEffort();

// Scheduled

Expand Down Expand Up @@ -119,7 +113,7 @@ public void stop() {
actionsDisposables.dispose();

// Stop publishing events
sink.complete();
sink.tryEmitComplete();
}

@Override
Expand All @@ -131,7 +125,7 @@ public Mono<String> spread(Message message) {

@Override
public Flux<Message> listen() {
return subject.onBackpressureBuffer();
return sink.asFlux().onBackpressureBuffer();
}

// ================================================
Expand Down Expand Up @@ -207,7 +201,7 @@ private void onGossipReq(Message message) {
if (gossipState == null) { // new gossip
gossipState = new GossipState(gossip, period);
gossips.put(gossip.gossipId(), gossipState);
sink.next(gossip.message());
sink.tryEmitNext(gossip.message());
}
gossipState.addToInfected(gossipRequest.from());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,10 @@
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

public final class MembershipProtocolImpl implements MembershipProtocol {
Expand Down Expand Up @@ -91,11 +89,8 @@ private enum MembershipUpdateReason {
private final List<MembershipEvent> removedMembersHistory = new CopyOnWriteArrayList<>();
private final Set<String> aliveEmittedSet = new HashSet<>();

// Subject

private final FluxProcessor<MembershipEvent, MembershipEvent> subject =
DirectProcessor.<MembershipEvent>create().serialize();
private final FluxSink<MembershipEvent> sink = subject.sink();
// Sink
private final Sinks.Many<MembershipEvent> sink = Sinks.many().multicast().directBestEffort();

// Disposables
private final Disposable.Composite actionsDisposables = Disposables.composite();
Expand Down Expand Up @@ -204,7 +199,7 @@ private boolean checkAddressesNotEqual(Address address0, Address address1) {

@Override
public Flux<MembershipEvent> listen() {
return subject.onBackpressureBuffer();
return sink.asFlux().onBackpressureBuffer();
}

/**
Expand Down Expand Up @@ -307,7 +302,7 @@ public void stop() {
suspicionTimeoutTasks.clear();

// Stop publishing events
sink.complete();
sink.tryEmitComplete();
}

@Override
Expand Down Expand Up @@ -735,7 +730,7 @@ private Mono<Void> onLeavingDetected(MembershipRecord r0, MembershipRecord r1) {

private void publishEvent(MembershipEvent event) {
LOGGER.info("[{}][publishEvent] {}", localMember, event);
sink.next(event);
sink.tryEmitNext(event);
}

private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public void testIsolatedParentNamespaces() {
parent1.address(), parent2.address(), bob.address(), carol.address()))
.startAwait();

//noinspection unused
Cluster eve =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
Expand Down
18 changes: 10 additions & 8 deletions cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;

public class ClusterTest extends BaseTest {

Expand Down Expand Up @@ -449,7 +449,7 @@ public void onMembershipEvent(MembershipEvent event) {
@Test
public void testMemberMetadataRemoved() throws InterruptedException {
// Start seed member
ReplayProcessor<MembershipEvent> seedEvents = ReplayProcessor.create();
final Sinks.Many<MembershipEvent> sink = Sinks.many().replay().all();
Map<String, String> seedMetadata = new HashMap<>();
seedMetadata.put("seed", "shmid");
final Cluster seedNode =
Expand All @@ -460,7 +460,7 @@ public void testMemberMetadataRemoved() throws InterruptedException {
new ClusterMessageHandler() {
@Override
public void onMembershipEvent(MembershipEvent event) {
seedEvents.onNext(event);
sink.tryEmitNext(event);
}
})
.startAwait();
Expand All @@ -469,7 +469,7 @@ public void onMembershipEvent(MembershipEvent event) {
// Start member with metadata
Map<String, String> node1Metadata = new HashMap<>();
node1Metadata.put("node", "shmod");
ReplayProcessor<MembershipEvent> node1Events = ReplayProcessor.create();
final Sinks.Many<MembershipEvent> sink1 = Sinks.many().replay().all();
final Cluster node1 =
new ClusterImpl()
.config(opts -> opts.metadata(node1Metadata))
Expand All @@ -479,16 +479,18 @@ public void onMembershipEvent(MembershipEvent event) {
new ClusterMessageHandler() {
@Override
public void onMembershipEvent(MembershipEvent event) {
node1Events.onNext(event);
sink1.tryEmitNext(event);
}
})
.startAwait();

// Check events
MembershipEvent nodeAddedEvent = seedEvents.as(Mono::from).block(Duration.ofSeconds(3));
MembershipEvent nodeAddedEvent = sink.asFlux().blockFirst(Duration.ofSeconds(3));
//noinspection ConstantConditions
assertEquals(Type.ADDED, nodeAddedEvent.type());

MembershipEvent seedAddedEvent = node1Events.as(Mono::from).block(Duration.ofSeconds(3));
MembershipEvent seedAddedEvent = sink1.asFlux().blockFirst(Duration.ofSeconds(3));
//noinspection ConstantConditions
assertEquals(Type.ADDED, seedAddedEvent.type());

// Check metadata
Expand All @@ -498,7 +500,7 @@ public void onMembershipEvent(MembershipEvent event) {
// Remove node1 from cluster
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, String>> removedMetadata = new AtomicReference<>();
seedEvents
sink.asFlux()
.filter(MembershipEvent::isRemoved)
.subscribe(
event -> {
Expand Down
Loading

0 comments on commit 426f343

Please sign in to comment.