From b757da636b90274846b3f073c0ace4457afd5fbf Mon Sep 17 00:00:00 2001 From: "rostyslav.baldovskyi" Date: Thu, 24 Aug 2023 18:20:57 +0300 Subject: [PATCH] Make compile (WIP) --- .../java/io/scalecube/cluster/Cluster.java | 12 ++- .../java/io/scalecube/cluster/Member.java | 13 +++ .../cluster/utils/NetworkEmulator.java | 32 ++++++- .../utils/NetworkEmulatorTransport.java | 3 +- .../io/scalecube/cluster/ClusterImpl.java | 27 +++--- .../fdetector/FailureDetectorImpl.java | 47 ++++++----- .../cluster/gossip/GossipProtocolImpl.java | 8 +- .../membership/MembershipProtocolImpl.java | 84 ++++++++++++------- .../cluster/metadata/MetadataStoreImpl.java | 26 +++--- .../cluster/ClusterNamespacesTest.java | 46 +++++----- .../io/scalecube/cluster/ClusterTest.java | 36 ++++---- .../fdetector/FailureDetectorTest.java | 6 +- .../membership/MembershipProtocolTest.java | 10 +-- .../examples/ClusterJoinExamples.java | 24 +++--- .../ClusterJoinNamespacesExamples.java | 26 +++--- .../examples/ClusterMetadataExample.java | 2 +- .../CustomMetadataEncodingExample.java | 4 +- .../io/scalecube/examples/GossipExample.java | 8 +- .../examples/MembershipEventsExample.java | 4 +- .../scalecube/examples/MessagingExample.java | 4 +- .../examples/WebsocketMessagingExample.java | 16 ++-- .../cluster/transport/api/Message.java | 32 +++++-- .../transport/api/TransportWrapper.java | 53 ++++++++++++ .../scalecube/transport/netty/BaseTest.java | 24 ++++++ .../transport/netty/tcp/TcpTransportTest.java | 22 ++--- .../websocket/WebsocketTransportTest.java | 22 ++--- 26 files changed, 391 insertions(+), 200 deletions(-) create mode 100644 transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java index b0f15582..00edafbb 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java @@ -3,6 +3,7 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.net.Address; import java.util.Collection; +import java.util.List; import java.util.Optional; import reactor.core.publisher.Mono; @@ -14,7 +15,7 @@ public interface Cluster { * * @return cluster address */ - Address address(); + List
addresses(); /** * Send a msg from this member (src) to target member (specified in parameters). @@ -34,6 +35,15 @@ public interface Cluster { */ Mono send(Address address, Message message); + /** + * Send a msg from this member (src) to target member (specified in parameters). + * + * @param addresses target addresses + * @param message msg + * @return promise telling success or failure + */ + Mono send(List
addresses, Message message); + /** * Sends message to the given address. It will issue connect in case if no transport channel by * given transport {@code address} exists already. Send is an async operation and expecting a diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java index b9798687..42b958c9 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java @@ -7,6 +7,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.StringJoiner; @@ -42,6 +43,18 @@ public Member(String id, String alias, List
addresses, String namespace this.namespace = Objects.requireNonNull(namespace, "namespace"); } + /** + * Constructor. + * + * @param id member id + * @param alias member alias (optional) + * @param address member address + * @param namespace namespace + */ + public Member(String id, String alias, Address address, String namespace) { + this(id, alias, Collections.singletonList(address), namespace); + } + /** * Returns cluster member local id. * diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java index d51a7526..d3e6ab3d 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; @@ -213,15 +214,40 @@ public InboundSettings inboundSettings(Address destination) { return inboundSettings.getOrDefault(destination, defaultInboundSettings); } + /** + * Returns network inbound settings applied to the given destination. + * + * @param destinations addresses of target endpoint + * @return network inbound settings + */ + public InboundSettings inboundSettings(List
destinations) { + if (destinations.isEmpty()) { + return defaultInboundSettings; + } + + for (Address destination : destinations) { + InboundSettings inboundSettings = this.inboundSettings.get(destination); + + if (inboundSettings != null) { + return inboundSettings; + } + } + + return defaultInboundSettings; + } + /** * Setter for network emulator inbound settings for specific destination. * * @param shallPass shallPass inbound flag */ - public void inboundSettings(Address destination, boolean shallPass) { + public void inboundSettings(List
destinations, boolean shallPass) { InboundSettings settings = new InboundSettings(shallPass); - inboundSettings.put(destination, settings); - LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); + + destinations.forEach(destination -> { + inboundSettings.put(destination, settings); + LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); + }); } /** diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java index 381042c5..c75ab24f 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java @@ -3,6 +3,7 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; +import java.util.Collections; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -83,6 +84,6 @@ public Flux listen() { } private Message enhanceWithSender(Message message) { - return Message.with(message).sender(transport.address()).build(); + return Message.with(message).sender(Collections.singletonList(transport.address())).build(); } } diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index f38f5e97..a530fd98 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -16,6 +16,7 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.transport.api.TransportFactory; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.net.Address; import io.scalecube.utils.ServiceLoaderUtil; import java.io.Serializable; @@ -243,7 +244,8 @@ private Mono doStart0() { .flatMap( boundTransport -> { localMember = createLocalMember(boundTransport.address()); - transport = new SenderAwareTransport(boundTransport, localMember.address()); + + transport = new SenderAwareTransport(boundTransport, localMember.addresses()); final String name = "sc-cluster-" + Integer.toHexString(System.identityHashCode(this)); @@ -379,7 +381,7 @@ private Member createLocalMember(Address address) { // First address comes as "fair" listen address memberAddresses.add(address); - // Tail goes as externalHosts, if the exist + // Tail goes as externalHosts, if exists final List externalHosts = config.externalHosts(); if (externalHosts != null) { for (String externalHost : externalHosts) { @@ -396,13 +398,13 @@ private Member createLocalMember(Address address) { } @Override - public Address address() { - return member().address(); + public List
addresses() { + return member().addresses(); } @Override public Mono send(Member member, Message message) { - return send(member.address(), message); + return TransportWrapper.send(transport, member.addresses(), message); } @Override @@ -410,6 +412,11 @@ public Mono send(Address address, Message message) { return transport.send(address, message); } + @Override + public Mono send(List
addresses, Message message) { + return TransportWrapper.send(transport, addresses, message); + } + @Override public Mono requestResponse(Address address, Message request) { return transport.requestResponse(address, request); @@ -417,7 +424,7 @@ public Mono requestResponse(Address address, Message request) { @Override public Mono requestResponse(Member member, Message request) { - return transport.requestResponse(member.address(), request); + return TransportWrapper.requestResponse(transport, member.addresses(), request); } @Override @@ -526,11 +533,11 @@ public Mono onShutdown() { private static class SenderAwareTransport implements Transport { private final Transport transport; - private final Address address; + private final List
addresses; - private SenderAwareTransport(Transport transport, Address address) { + private SenderAwareTransport(Transport transport, List
addresses) { this.transport = Objects.requireNonNull(transport); - this.address = Objects.requireNonNull(address); + this.addresses = Objects.requireNonNull(addresses); } @Override @@ -569,7 +576,7 @@ public Flux listen() { } private Message enhanceWithSender(Message message) { - return Message.with(message).sender(address).build(); + return Message.with(message).sender(addresses).build(); } } } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index ac270c4e..719437b5 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -8,6 +8,7 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.net.Address; import java.time.Duration; import java.util.ArrayList; @@ -42,6 +43,8 @@ public final class FailureDetectorImpl implements FailureDetector { private final Transport transport; private final FailureDetectorConfig config; + private final TransportWrapper transportWrapper; + // State private final List pingMembers = new ArrayList<>(); @@ -81,6 +84,8 @@ public FailureDetectorImpl( this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); + this.transportWrapper = new TransportWrapper(this.transport); + // Subscribe actionsDisposables.addAll( Arrays.asList( @@ -145,9 +150,9 @@ private void doPing() { Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build(); LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); - Address address = pingMember.address(); - transport - .requestResponse(address, pingMsg) + List
addresses = pingMember.addresses(); + transportWrapper + .requestResponse(addresses, pingMsg) .timeout(Duration.ofMillis(config.pingTimeout()), scheduler) .publishOn(scheduler) .subscribe( @@ -189,8 +194,8 @@ private void doPingReq( Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout()); pingReqMembers.forEach( member -> - transport - .requestResponse(member.address(), pingReqMsg) + transportWrapper + .requestResponse(member.addresses(), pingReqMsg) .timeout(timeout, scheduler) .publishOn(scheduler) .subscribe( @@ -232,7 +237,7 @@ private void onMessage(Message message) { /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; - Address sender = message.sender(); + List
sender = message.sender(); LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender); PingData data = message.data(); data = data.withAckType(AckType.DEST_OK); @@ -249,10 +254,10 @@ private void onPing(Message message) { String correlationId = message.correlationId(); Message ackMessage = Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = data.getFrom().address(); - LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address); - transport - .send(address, ackMessage) + List
addresses = data.getFrom().addresses(); + LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses); + transportWrapper + .send(addresses, ackMessage) .subscribe( null, ex -> @@ -260,7 +265,7 @@ private void onPing(Message message) { "[{}][{}] Failed to send PingAck to {}, cause: {}", localMember, period, - address, + addresses, ex.toString())); } @@ -275,10 +280,10 @@ private void onPingReq(Message message) { PingData pingReqData = new PingData(localMember, target, originalIssuer); Message pingMessage = Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build(); - Address address = target.address(); - LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address); - transport - .send(address, pingMessage) + List
addresses = target.addresses(); + LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, addresses); + transportWrapper + .send(addresses, pingMessage) .subscribe( null, ex -> @@ -286,7 +291,7 @@ private void onPingReq(Message message) { "[{}][{}] Failed to send transit Ping to {}, cause: {}", localMember, period, - address, + addresses, ex.toString())); } @@ -305,10 +310,10 @@ private void onTransitPingAck(Message message) { PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType); Message originalAckMessage = Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = target.address(); - LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address); - transport - .send(address, originalAckMessage) + List
addresses = target.addresses(); + LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, addresses); + transportWrapper + .send(addresses, originalAckMessage) .subscribe( null, ex -> @@ -316,7 +321,7 @@ private void onTransitPingAck(Message message) { "[{}][{}] Failed to resend transit PingAck to {}, cause: {}", localMember, period, - address, + addresses, ex.toString())); } diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 2f24beb8..1406dad9 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -7,6 +7,7 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.net.Address; import java.util.ArrayList; import java.util.Arrays; @@ -287,14 +288,13 @@ private void spreadGossipsTo(long period, Member member) { } // Send gossip request - Address address = member.address(); + List
addresses = member.addresses(); gossips.stream() .map(this::buildGossipRequestMessage) .forEach( message -> - transport - .send(address, message) + TransportWrapper.send(transport, addresses, message) .subscribe( null, ex -> @@ -303,7 +303,7 @@ private void spreadGossipsTo(long period, Member member) { localMember, period, message, - address, + addresses, ex.toString()))); } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 53fed376..af16480e 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -15,6 +15,7 @@ import io.scalecube.cluster.metadata.MetadataStore; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.net.Address; import java.net.InetAddress; import java.nio.ByteBuffer; @@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -80,6 +80,8 @@ private enum MembershipUpdateReason { private final GossipProtocol gossipProtocol; private final MetadataStore metadataStore; + private final TransportWrapper transportWrapper; + // State private final Map membershipTable = new HashMap<>(); @@ -127,6 +129,8 @@ public MembershipProtocolImpl( this.membershipConfig = Objects.requireNonNull(config).membershipConfig(); this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig(); + this.transportWrapper = new TransportWrapper(this.transport); + // Prepare seeds seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers()); @@ -170,24 +174,31 @@ private List
cleanUpSeedMembers(Collection
seedMembers) { String hostAddress = localIpAddress.getHostAddress(); String hostName = localIpAddress.getHostName(); - Address memberAddr = localMember.address(); Address transportAddr = transport.address(); - Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port()); Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port()); - Address memberAddByHostName = Address.create(hostName, memberAddr.port()); Address transportAddrByHostName = Address.create(hostName, transportAddr.port()); return new LinkedHashSet<>(seedMembers) .stream() - .filter(addr -> checkAddressesNotEqual(addr, memberAddr)) + .filter(addr -> checkAddressesNotEqual(addr, localMember, hostAddress, hostName)) .filter(addr -> checkAddressesNotEqual(addr, transportAddr)) - .filter(addr -> checkAddressesNotEqual(addr, memberAddrByHostAddress)) .filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostAddress)) - .filter(addr -> checkAddressesNotEqual(addr, memberAddByHostName)) .filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostName)) .collect(Collectors.toList()); } + private boolean checkAddressesNotEqual( + Address smAddress, Member localMember, String hostAddress, String hostName) { + return localMember.addresses().stream() + .allMatch( + memberAddress -> + checkAddressesNotEqual(smAddress, memberAddress) + && checkAddressesNotEqual( + smAddress, Address.create(hostAddress, memberAddress.port())) + && checkAddressesNotEqual( + smAddress, Address.create(hostName, memberAddress.port()))); + } + private boolean checkAddressesNotEqual(Address address0, Address address1) { if (!address0.equals(address1)) { return true; @@ -330,27 +341,29 @@ public Optional member(String id) { @Override public Optional member(Address address) { - return new ArrayList<>(members.values()) - .stream().filter(member -> member.address().equals(address)).findFirst(); + return members.values().stream() + .filter(member -> member.addresses().stream().anyMatch(address::equals)) + .findFirst(); } private void doSync() { - Address address = selectSyncAddress().orElse(null); - if (address == null) { + List
addresses = selectSyncAddress(); + + if (addresses.isEmpty()) { return; } Message message = prepareSyncDataMsg(SYNC, null); - LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address); - transport - .send(address, message) + LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses); + transportWrapper + .send(addresses, message) .subscribe( null, ex -> LOGGER.debug( "[{}][doSync] Failed to send Sync to {}, cause: {}", localMember, - address, + addresses, ex.toString())); } @@ -394,13 +407,13 @@ private Mono onSyncAck(Message syncAckMsg, boolean onStart) { private Mono onSync(Message syncMsg) { return Mono.defer( () -> { - final Address sender = syncMsg.sender(); + final List
sender = syncMsg.sender(); LOGGER.debug("[{}] Received Sync from {}", localMember, sender); return syncMembership(syncMsg.data(), false) .doOnSuccess( avoid -> { Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId()); - transport + transportWrapper .send(sender, message) .subscribe( null, @@ -429,16 +442,16 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { // Alive won't override SUSPECT so issue instead extra sync with member to force it spread // alive with inc + 1 Message syncMsg = prepareSyncDataMsg(SYNC, null); - Address address = fdEvent.member().address(); - transport - .send(address, syncMsg) + List
addresses = fdEvent.member().addresses(); + transportWrapper + .send(addresses, syncMsg) .subscribe( null, ex -> LOGGER.debug( "[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}", localMember, - address, + addresses, ex.toString())); } else { MembershipRecord record = @@ -468,17 +481,24 @@ private void onMembershipGossip(Message message) { } } - private Optional
selectSyncAddress() { - List
addresses = - Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address)) - .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new)); - Collections.shuffle(addresses); - if (addresses.isEmpty()) { - return Optional.empty(); - } else { - int i = ThreadLocalRandom.current().nextInt(addresses.size()); - return Optional.of(addresses.get(i)); + private List
selectSyncAddress() { + Collection otherMembers = otherMembers(); + + if (seedMembers.isEmpty() && otherMembers.isEmpty()) { + return Collections.emptyList(); + } + + int totalSize = seedMembers.size() + otherMembers.size(); + int randomIndex = ThreadLocalRandom.current().nextInt(totalSize); + + if (randomIndex < seedMembers.size()) { + return Collections.singletonList(seedMembers.get(randomIndex)); } + + List otherMembersList = new ArrayList<>(otherMembers); + Member member = otherMembersList.get(randomIndex - seedMembers.size()); + + return member.addresses(); } // ================================================ @@ -593,7 +613,7 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason } // If received updated for local member then increase incarnation and spread Alive gossip - if (r1.member().address().equals(localMember.address())) { + if (r1.member().addresses().equals(localMember.addresses())) { if (r1.member().id().equals(localMember.id())) { return onSelfMemberDetected(r0, r1, reason); } else { diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index 35ba5328..1ace90eb 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -4,10 +4,12 @@ import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.net.Address; import java.nio.ByteBuffer; import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -37,6 +39,8 @@ public class MetadataStoreImpl implements MetadataStore { private final Transport transport; private final ClusterConfig config; + private final TransportWrapper transportWrapper; + // State private final Map membersMetadata = new HashMap<>(); @@ -69,6 +73,8 @@ public MetadataStoreImpl( this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); this.localMetadata = localMetadata; // optional + + this.transportWrapper = new TransportWrapper(this.transport); } @Override @@ -148,7 +154,6 @@ public Mono fetchMetadata(Member member) { return Mono.defer( () -> { final String cid = UUID.randomUUID().toString(); - final Address targetAddress = member.address(); LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member); @@ -159,17 +164,18 @@ public Mono fetchMetadata(Member member) { .data(new GetMetadataRequest(member)) .build(); - return transport - .requestResponse(targetAddress, request) + // TODO. Make transport abstraction around this logic + + List
addresses = member.addresses(); + + return transportWrapper + .requestResponse(addresses, request) .timeout(Duration.ofMillis(config.metadataTimeout()), scheduler) .publishOn(scheduler) .doOnSuccess( s -> LOGGER.debug( - "[{}][{}] Received GetMetadataResp from {}", - localMember, - cid, - targetAddress)) + "[{}][{}] Received GetMetadataResp from {}", localMember, cid, addresses)) .map(Message::data) .map(GetMetadataResponse::getMetadata) .doOnError( @@ -179,7 +185,7 @@ public Mono fetchMetadata(Member member) { + "from {} within {} ms, cause: {}", localMember, cid, - targetAddress, + addresses, config.metadataTimeout(), th.toString())); }); @@ -196,7 +202,7 @@ private void onMessage(Message message) { } private void onMetadataRequest(Message message) { - final Address sender = message.sender(); + final List
sender = message.sender(); // TODO. Log LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); GetMetadataRequest reqData = message.data(); @@ -224,7 +230,7 @@ private void onMetadataRequest(Message message) { .build(); LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender); - transport + transportWrapper .send(sender, response) .subscribe( null, diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java index bbcbd1ab..6c41fd3e 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java @@ -66,14 +66,14 @@ public void testSeparateEmptyNamespaces() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root1")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); Cluster root2 = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root2")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); assertThat(root.otherMembers(), iterableWithSize(0)); @@ -93,21 +93,21 @@ public void testSeparateNonEmptyNamespaces() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); Cluster carol = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root")) - .membership(opts -> opts.seedMembers(root.address(), bob.address())) + .membership(opts -> opts.seedMembers(root.addresses().get(0), bob.addresses().get(0))) .startAwait(); Cluster root2 = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root2")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); Cluster dan = @@ -117,7 +117,7 @@ public void testSeparateNonEmptyNamespaces() { .membership( opts -> opts.seedMembers( - root.address(), root2.address(), bob.address(), carol.address())) + root.addresses().get(0), root2.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0))) .startAwait(); Cluster eve = @@ -127,11 +127,11 @@ public void testSeparateNonEmptyNamespaces() { .membership( opts -> opts.seedMembers( - root.address(), - root2.address(), - dan.address(), - bob.address(), - carol.address())) + root.addresses().get(0), + root2.addresses().get(0), + dan.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0))) .startAwait(); assertThat(root.otherMembers(), containsInAnyOrder(bob.member(), carol.member())); @@ -155,14 +155,14 @@ public void testSimpleNamespacesHierarchy() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("develop/develop")) - .membership(opts -> opts.seedMembers(rootDevelop.address())) + .membership(opts -> opts.seedMembers(rootDevelop.addresses())) .startAwait(); Cluster carol = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("develop/develop")) - .membership(opts -> opts.seedMembers(rootDevelop.address(), bob.address())) + .membership(opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0))) .startAwait(); Cluster dan = @@ -170,7 +170,7 @@ public void testSimpleNamespacesHierarchy() { .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("develop/develop-2")) .membership( - opts -> opts.seedMembers(rootDevelop.address(), bob.address(), carol.address())) + opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0))) .startAwait(); Cluster eve = @@ -180,7 +180,7 @@ public void testSimpleNamespacesHierarchy() { .membership( opts -> opts.seedMembers( - rootDevelop.address(), bob.address(), carol.address(), dan.address())) + rootDevelop.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0), dan.addresses().get(0))) .startAwait(); assertThat( @@ -206,14 +206,14 @@ public void testIsolatedParentNamespaces() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("a/1/c")) - .membership(opts -> opts.seedMembers(parent1.address())) + .membership(opts -> opts.seedMembers(parent1.addresses())) .startAwait(); Cluster carol = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("a/1/c")) - .membership(opts -> opts.seedMembers(parent1.address(), bob.address())) + .membership(opts -> opts.seedMembers(parent1.addresses().get(0), bob.addresses().get(0))) .startAwait(); Cluster parent2 = @@ -229,7 +229,7 @@ public void testIsolatedParentNamespaces() { .membership( opts -> opts.seedMembers( - parent1.address(), parent2.address(), bob.address(), carol.address())) + parent1.addresses().get(0), parent2.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0))) .startAwait(); //noinspection unused @@ -240,11 +240,11 @@ public void testIsolatedParentNamespaces() { .membership( opts -> opts.seedMembers( - parent1.address(), - parent2.address(), - bob.address(), - carol.address(), - dan.address())) + parent1.addresses().get(0), + parent2.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0), + dan.addresses().get(0))) .startAwait(); assertThat(parent1.otherMembers(), containsInAnyOrder(bob.member(), carol.member())); diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index b99f6189..906d7e2f 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -92,7 +92,7 @@ public void testMembersAccessFromScheduler() { Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); Cluster otherNode = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -101,8 +101,8 @@ public void testMembersAccessFromScheduler() { // Members by address - Optional otherNodeOnSeedNode = seedNode.member(otherNode.address()); - Optional seedNodeOnOtherNode = otherNode.member(seedNode.address()); + Optional otherNodeOnSeedNode = seedNode.member(otherNode.addresses().get(0)); + Optional seedNodeOnOtherNode = otherNode.member(seedNode.addresses().get(0)); assertEquals(otherNode.member(), otherNodeOnSeedNode.orElse(null)); assertEquals(seedNode.member(), seedNodeOnOtherNode.orElse(null)); @@ -181,7 +181,7 @@ public void testJoinDynamicPort() { for (int i = 0; i < membersNum; i++) { otherNodes.add( new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait()); } @@ -212,7 +212,7 @@ public void testUpdateMetadata() throws Exception { metadataNode = new ClusterImpl() .config(opts -> opts.metadata(metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -221,7 +221,7 @@ public void testUpdateMetadata() throws Exception { .flatMap( integer -> new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -285,7 +285,7 @@ public void testUpdateMetadataProperty() throws Exception { metadataNode = new ClusterImpl() .config(opts -> opts.metadata(metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -294,7 +294,7 @@ public void testUpdateMetadataProperty() throws Exception { .flatMap( integer -> new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -363,7 +363,7 @@ public void testRemoveMetadataProperty() throws Exception { metadataNode = new ClusterImpl() .config(opts -> opts.metadata(metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -372,7 +372,7 @@ public void testRemoveMetadataProperty() throws Exception { .flatMap( integer -> new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -452,19 +452,19 @@ public void onMembershipEvent(MembershipEvent event) { // Start nodes final Cluster node1 = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); final Cluster node2 = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); final Cluster node3 = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); @@ -506,7 +506,7 @@ public void onMembershipEvent(MembershipEvent event) { final Cluster node1 = new ClusterImpl() .config(opts -> opts.metadata(node1Metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -559,9 +559,11 @@ public void testJoinSeedClusterWithNoExistingSeedMember() { // Start seed node Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); - Address nonExistingSeed1 = Address.from("localhost:1234"); - Address nonExistingSeed2 = Address.from("localhost:5678"); - Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; + List
seeds = new ArrayList<>(); + + seeds.add(Address.from("localhost:1234")); // Not existent + seeds.add(Address.from("localhost:5678")); // Not existent + seeds.addAll(seedNode.addresses()); Cluster otherNode = new ClusterImpl() diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 316f814e..bf762570 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -445,7 +445,8 @@ private void assertStatus( events.stream() .filter(event -> event.status() == status) .map(FailureDetectorEvent::member) - .map(Member::address) + .map(Member::addresses) + .flatMap(Collection::stream) .collect(Collectors.toList()); String msg1 = @@ -472,7 +473,8 @@ private Future> listenNextEventFor( List> resultFuture = new ArrayList<>(); for (final Address member : addresses) { final CompletableFuture future = new CompletableFuture<>(); - fd.listen().filter(event -> event.member().address() == member).subscribe(future::complete); + fd.listen() + .filter(event -> event.member().addresses().contains(member)).subscribe(future::complete); resultFuture.add(future); } diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index ef559256..0678bd45 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -750,19 +750,19 @@ public void testOverrideMemberAddress() throws UnknownHostException { NetworkEmulatorTransport e = createTransport(); MembershipProtocolImpl cmA = - createMembership(a, testConfig(Collections.emptyList()).externalHost(localAddress)); + createMembership(a, testConfig(Collections.emptyList()).externalHosts(localAddress)); MembershipProtocolImpl cmB = createMembership( - b, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); + b, testConfig(Collections.singletonList(a.address())).externalHosts(localAddress)); MembershipProtocolImpl cmC = createMembership( - c, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); + c, testConfig(Collections.singletonList(a.address())).externalHosts(localAddress)); MembershipProtocolImpl cmD = createMembership( - d, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); + d, testConfig(Collections.singletonList(b.address())).externalHosts(localAddress)); MembershipProtocolImpl cmE = createMembership( - e, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); + e, testConfig(Collections.singletonList(b.address())).externalHosts(localAddress)); try { awaitSeconds(3); diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java index cff607f2..b2312f15 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java @@ -30,7 +30,7 @@ public static void main(String[] args) { Cluster bob = new ClusterImpl() .config(opts -> opts.memberAlias("Bob")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -39,7 +39,7 @@ public static void main(String[] args) { Cluster carol = new ClusterImpl() .config(opts -> opts.memberAlias("Carol").metadata(metadata)) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -47,7 +47,7 @@ public static void main(String[] args) { ClusterConfig configWithFixedPort = new ClusterConfig() .memberAlias("Dan") - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transport(opts -> opts.port(3000)); Cluster dan = new ClusterImpl(configWithFixedPort) @@ -61,10 +61,10 @@ public static void main(String[] args) { .membership( opts -> opts.seedMembers( - alice.address(), - bob.address(), - carol.address(), - dan.address()) // won't join anyway + alice.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0), + dan.addresses().get(0)) // won't join anyway .namespace("another-cluster")); Cluster eve = new ClusterImpl(configWithSyncGroup) @@ -75,31 +75,31 @@ public static void main(String[] args) { System.out.println( "Alice (" - + alice.address() + + alice.addresses() + ") cluster: " + alice.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Bob (" - + bob.address() + + bob.addresses() + ") cluster: " + bob.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Carol (" - + carol.address() + + carol.addresses() + ") cluster: " + carol.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Dan (" - + dan.address() + + dan.addresses() + ") cluster: " + dan.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Eve (" - + eve.address() + + eve.addresses() + ") cluster: " // alone in cluster + eve.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); } diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java index bb1910b1..611c5cfa 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java @@ -24,7 +24,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob")) .membership(opts -> opts.namespace("alice/bob-and-carol")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -33,7 +33,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Carol")) .membership(opts -> opts.namespace("alice/bob-and-carol")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -41,7 +41,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob-and-Carol-Child-1")) .membership(opts -> opts.namespace("alice/bob-and-carol/child-1")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -49,7 +49,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob-and-Carol-Child-2")) .membership(opts -> opts.namespace("alice/bob-and-carol/child-2")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -58,7 +58,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Dan")) .membership(opts -> opts.namespace("alice/dan-and-eve")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -67,7 +67,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Eve")) .membership(opts -> opts.namespace("alice/dan-and-eve")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -75,37 +75,37 @@ public static void main(String[] args) { System.out.println( "Alice (" - + alice.address() + + alice.addresses() + ") cluster: " + alice.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Bob (" - + bob.address() + + bob.addresses() + ") cluster: " + bob.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Carol (" - + carol.address() + + carol.addresses() + ") cluster: " + carol.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Dan (" - + dan.address() + + dan.addresses() + ") cluster: " + dan.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Eve (" - + eve.address() + + eve.addresses() + ") cluster: " // alone in cluster + eve.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Bob-And-Carol-Child-1 (" - + bobAndCarolChild1.address() + + bobAndCarolChild1.addresses() + ") cluster: " // alone in cluster + bobAndCarolChild1.members().stream() .map(Member::toString) @@ -113,7 +113,7 @@ public static void main(String[] args) { System.out.println( "Bob-And-Carol-Child-2 (" - + carolChild2.address() + + carolChild2.addresses() + ") cluster: " // alone in cluster + carolChild2.members().stream() .map(Member::toString) diff --git a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java index 95824c93..14ce4056 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java @@ -30,7 +30,7 @@ public static void main(String[] args) throws Exception { Cluster joe = new ClusterImpl() .config(opts -> opts.metadata(Collections.singletonMap("name", "Joe"))) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { diff --git a/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java b/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java index 90604932..6dd1f2fe 100644 --- a/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java +++ b/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java @@ -24,7 +24,7 @@ public static void main(String[] args) throws Exception { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(123L)) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .startAwait(); System.out.println( "[" + joe.member().id() + "] Joe's metadata: " + joe.metadata().orElse(null)); @@ -33,7 +33,7 @@ public static void main(String[] args) throws Exception { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(456L)) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .startAwait(); System.out.println( "[" + bob.member().id() + "] Bob's metadata: " + bob.metadata().orElse(null)); diff --git a/examples/src/main/java/io/scalecube/examples/GossipExample.java b/examples/src/main/java/io/scalecube/examples/GossipExample.java index 2135880c..d4919541 100644 --- a/examples/src/main/java/io/scalecube/examples/GossipExample.java +++ b/examples/src/main/java/io/scalecube/examples/GossipExample.java @@ -34,7 +34,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster bob = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -50,7 +50,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster carol = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -66,7 +66,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster dan = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -82,7 +82,7 @@ public void onGossip(Message gossip) { // Start cluster node Eve that joins cluster and spreads gossip Cluster eve = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); eve.spreadGossip(Message.fromData("Gossip from Eve")) diff --git a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java index 2040588f..b68bb3b0 100644 --- a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java +++ b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java @@ -47,7 +47,7 @@ public void onMembershipEvent(MembershipEvent event) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob")) .config(opts -> opts.metadata(Collections.singletonMap("name", "Bob"))) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -66,7 +66,7 @@ public void onMembershipEvent(MembershipEvent event) { new ClusterImpl() .config(opts -> opts.memberAlias("Carol")) .config(opts -> opts.metadata(Collections.singletonMap("name", "Carol"))) - .membership(opts -> opts.seedMembers(bob.address())) + .membership(opts -> opts.seedMembers(bob.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { diff --git a/examples/src/main/java/io/scalecube/examples/MessagingExample.java b/examples/src/main/java/io/scalecube/examples/MessagingExample.java index b747b229..37dd445a 100644 --- a/examples/src/main/java/io/scalecube/examples/MessagingExample.java +++ b/examples/src/main/java/io/scalecube/examples/MessagingExample.java @@ -39,7 +39,7 @@ public void onMessage(Message msg) { // messages Cluster bob = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -58,7 +58,7 @@ public void onMessage(Message msg) { // Join cluster node Carol to cluster with Alice and Bob Cluster carol = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address(), bob.address())) + .membership(opts -> opts.seedMembers(alice.addresses().get(0), bob.addresses().get(0))) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { diff --git a/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java b/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java index 7049e1a1..cb888573 100644 --- a/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java +++ b/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java @@ -40,7 +40,7 @@ public void onMessage(Message msg) { Cluster bob = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .handler( cluster -> { return new ClusterMessageHandler() { @@ -59,15 +59,13 @@ public void onMessage(Message msg) { Cluster carol = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) - .membership(opts -> opts.seedMembers(alice.address(), bob.address())) + .membership(opts -> opts.seedMembers(alice.addresses().get(0), bob.addresses().get(0))) .handler( - cluster -> { - return new ClusterMessageHandler() { - @Override - public void onMessage(Message msg) { - System.out.println("Carol received: " + msg.data()); - } - }; + cluster -> new ClusterMessageHandler() { + @Override + public void onMessage(Message msg) { + System.out.println("Carol received: " + msg.data()); + } }) .startAwait(); diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java index b5e3e879..e9a3c7a9 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java @@ -5,13 +5,15 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.Optional; import java.util.StringJoiner; +import java.util.stream.Collectors; /** * The Class Message introduces generic protocol used for point to point communication by transport. @@ -36,7 +38,8 @@ public final class Message implements Externalizable { * This header represents sender address of type {@link Address}. It's an address of message * originator. This header is optional. */ - public static final String HEADER_SENDER = "sender"; + public static final String HEADER_SENDER = + "sender"; // TODO. Value should be list of addresses (comma separated) private Map headers = Collections.emptyMap(); private Object data; @@ -190,8 +193,17 @@ public T data() { * * @return address */ - public Address sender() { - return Optional.ofNullable(header(HEADER_SENDER)).map(Address::from).orElse(null); + public List
sender() { + String headerValue = header(HEADER_SENDER); + + if (headerValue == null) { + return Collections.emptyList(); + } + + return Arrays.stream(headerValue.split(",")) + .map(String::trim) // Removes leading and trailing spaces. + .map(Address::from) + .collect(Collectors.toList()); } @Override @@ -281,8 +293,16 @@ public Builder correlationId(String correlationId) { return header(HEADER_CORRELATION_ID, correlationId); } - public Builder sender(Address sender) { - return header(HEADER_SENDER, sender.toString()); + /** + * Setter for header. + * + * @param addresses addresses + * @return builder + */ + public Builder sender(List
addresses) { + return header( + HEADER_SENDER, + addresses.stream().map(Address::toString).collect(Collectors.joining(","))); } public Message build() { diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java new file mode 100644 index 00000000..2d025a8b --- /dev/null +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java @@ -0,0 +1,53 @@ +package io.scalecube.cluster.transport.api; + +import io.scalecube.net.Address; +import java.util.List; +import reactor.core.publisher.Mono; + +public class TransportWrapper { + + private final Transport transport; + + public TransportWrapper(Transport transport) { + this.transport = transport; + } + + public Mono requestResponse(List
addresses, Message request) { + return requestResponse(transport, addresses, 0, request); + } + + public static Mono requestResponse( + Transport transport, List
addresses, Message request) { + return requestResponse(transport, addresses, 0, request); + } + + private static Mono requestResponse( + Transport transport, List
addresses, int currentIndex, Message request) { + if (currentIndex >= addresses.size()) { + return Mono.error(new RuntimeException("All addresses have been tried and failed.")); + } + + return transport + .requestResponse(addresses.get(currentIndex), request) + .onErrorResume(th -> requestResponse(transport, addresses, currentIndex + 1, request)); + } + + public Mono send(List
addresses, Message request) { + return send(transport, addresses, 0, request); + } + + public static Mono send(Transport transport, List
addresses, Message request) { + return send(transport, addresses, 0, request); + } + + private static Mono send( + Transport transport, List
addresses, int currentIndex, Message request) { + if (currentIndex >= addresses.size()) { + return Mono.error(new RuntimeException("All addresses have been tried and failed.")); + } + + return transport + .send(addresses.get(currentIndex), request) + .onErrorResume(th -> send(transport, addresses, currentIndex + 1, request)); + } +} diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java index 3d07e84d..3f24d0bd 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java @@ -3,11 +3,14 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.net.Address; import io.scalecube.transport.netty.tcp.TcpTransportFactory; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; +import java.util.List; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; @@ -15,6 +18,8 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import javax.sql.rowset.spi.TransactionalWriter; + /** Base test class. */ public class BaseTest { @@ -50,6 +55,25 @@ protected Mono send(Transport transport, Address to, Message msg) { th.toString())); } + /** + * Sending message from src to destination. + * + * @param transport src + * @param to destinations + * @param msg request + */ + protected Mono send(Transport transport, List
to, Message msg) { + return TransportWrapper.send(transport, to, msg) + .doOnError( + th -> + LOGGER.error( + "Failed to send {} to {} from transport: {}, cause: {}", + msg, + to, + transport, + th.toString())); + } + /** * Stopping transport. * diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java index f8bb8daa..2bd14e96 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.fail; import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; @@ -94,9 +95,11 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception { .listen() .subscribe( message -> { - Address address = message.sender(); - assertEquals(client.address(), address, "Expected clientAddress"); - send(server, address, Message.fromQualifier("hi client")).subscribe(); + List
addresses = message.sender(); + assertTrue( + addresses.stream().anyMatch(a -> client.address().equals(a)), + "Expected clientAddress"); + send(server, addresses, Message.fromQualifier("hi client")).subscribe(); }); CompletableFuture messageFuture = new CompletableFuture<>(); @@ -145,8 +148,7 @@ public void testPingPongOnSingleChannel() throws Exception { messages -> { for (Message message : messages) { Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) + TransportWrapper.send(server, message.sender(), echo) .subscribe(null, th -> LOGGER.error("Failed to send message", th)); } }); @@ -215,8 +217,7 @@ public void testPingPongOnSeparateChannel() throws Exception { messages -> { for (Message message : messages) { Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) + TransportWrapper.send(server, message.sender(), echo) .subscribe(null, th -> LOGGER.error("Failed to send message", th)); } }); @@ -280,8 +281,7 @@ public void testObserverThrowsException() throws Exception { } if (qualifier.startsWith("q")) { Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) + TransportWrapper.send(server, message.sender(), echo) .subscribe(null, th -> LOGGER.error("Failed to send message", th)); } }, @@ -320,7 +320,9 @@ public void testBlockAndUnblockTraffic() throws Exception { client = createTcpTransport(); server = createTcpTransport(); - server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); + server + .listen() + .subscribe(message -> TransportWrapper.send(server, message.sender(), message).subscribe()); Sinks.Many responses = Sinks.many().replay().all(); client diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java index 050474af..da9b00d3 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.fail; import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.TransportWrapper; import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; @@ -94,9 +95,11 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception { .listen() .subscribe( message -> { - Address address = message.sender(); - assertEquals(client.address(), address, "Expected clientAddress"); - send(server, address, Message.fromQualifier("hi client")).subscribe(); + List
addresses = message.sender(); + assertTrue( + addresses.stream().anyMatch(a -> client.address().equals(a)), + "Expected clientAddress"); + send(server, addresses, Message.fromQualifier("hi client")).subscribe(); }); CompletableFuture messageFuture = new CompletableFuture<>(); @@ -145,8 +148,7 @@ public void testPingPongOnSingleChannel() throws Exception { messages -> { for (Message message : messages) { Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) + TransportWrapper.send(server, message.sender(), echo) .subscribe(null, th -> LOGGER.error("Failed to send message", th)); } }); @@ -215,8 +217,7 @@ public void testPingPongOnSeparateChannel() throws Exception { messages -> { for (Message message : messages) { Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) + TransportWrapper.send(server, message.sender(), echo) .subscribe(null, th -> LOGGER.error("Failed to send message", th)); } }); @@ -280,8 +281,7 @@ public void testObserverThrowsException() throws Exception { } if (qualifier.startsWith("q")) { Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) + TransportWrapper.send(server, message.sender(), echo) .subscribe(null, th -> LOGGER.error("Failed to send message", th)); } }, @@ -320,7 +320,9 @@ public void testBlockAndUnblockTraffic() throws Exception { client = createWebsocketTransport(); server = createWebsocketTransport(); - server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); + server + .listen() + .subscribe(message -> TransportWrapper.send(server, message.sender(), message).subscribe()); Sinks.Many responses = Sinks.many().replay().all(); client