Skip to content

Commit

Permalink
Make compile (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
rostyslav.baldovskyi committed Aug 24, 2023
1 parent 884c42e commit b757da6
Show file tree
Hide file tree
Showing 26 changed files with 391 additions and 200 deletions.
12 changes: 11 additions & 1 deletion cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Mono;

Expand All @@ -14,7 +15,7 @@ public interface Cluster {
*
* @return cluster address
*/
Address address();
List<Address> addresses();

/**
* Send a msg from this member (src) to target member (specified in parameters).
Expand All @@ -34,6 +35,15 @@ public interface Cluster {
*/
Mono<Void> send(Address address, Message message);

/**
* Send a msg from this member (src) to target member (specified in parameters).
*
* @param addresses target addresses
* @param message msg
* @return promise telling success or failure
*/
Mono<Void> send(List<Address> addresses, Message message);

/**
* Sends message to the given address. It will issue connect in case if no transport channel by
* given transport {@code address} exists already. Send is an async operation and expecting a
Expand Down
13 changes: 13 additions & 0 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
Expand Down Expand Up @@ -42,6 +43,18 @@ public Member(String id, String alias, List<Address> addresses, String namespace
this.namespace = Objects.requireNonNull(namespace, "namespace");
}

/**
* Constructor.
*
* @param id member id
* @param alias member alias (optional)
* @param address member address
* @param namespace namespace
*/
public Member(String id, String alias, Address address, String namespace) {
this(id, alias, Collections.singletonList(address), namespace);
}

/**
* Returns cluster member local id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -213,15 +214,40 @@ public InboundSettings inboundSettings(Address destination) {
return inboundSettings.getOrDefault(destination, defaultInboundSettings);
}

/**
* Returns network inbound settings applied to the given destination.
*
* @param destinations addresses of target endpoint
* @return network inbound settings
*/
public InboundSettings inboundSettings(List<Address> destinations) {
if (destinations.isEmpty()) {
return defaultInboundSettings;
}

for (Address destination : destinations) {
InboundSettings inboundSettings = this.inboundSettings.get(destination);

if (inboundSettings != null) {
return inboundSettings;
}
}

return defaultInboundSettings;
}

/**
* Setter for network emulator inbound settings for specific destination.
*
* @param shallPass shallPass inbound flag
*/
public void inboundSettings(Address destination, boolean shallPass) {
public void inboundSettings(List<Address> destinations, boolean shallPass) {
InboundSettings settings = new InboundSettings(shallPass);
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);

destinations.forEach(destination -> {
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.util.Collections;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -83,6 +84,6 @@ public Flux<Message> listen() {
}

private Message enhanceWithSender(Message message) {
return Message.with(message).sender(transport.address()).build();
return Message.with(message).sender(Collections.singletonList(transport.address())).build();
}
}
27 changes: 17 additions & 10 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import io.scalecube.utils.ServiceLoaderUtil;
import java.io.Serializable;
Expand Down Expand Up @@ -243,7 +244,8 @@ private Mono<Cluster> doStart0() {
.flatMap(
boundTransport -> {
localMember = createLocalMember(boundTransport.address());
transport = new SenderAwareTransport(boundTransport, localMember.address());

transport = new SenderAwareTransport(boundTransport, localMember.addresses());

final String name =
"sc-cluster-" + Integer.toHexString(System.identityHashCode(this));
Expand Down Expand Up @@ -379,7 +381,7 @@ private Member createLocalMember(Address address) {
// First address comes as "fair" listen address
memberAddresses.add(address);

// Tail goes as externalHosts, if the exist
// Tail goes as externalHosts, if exists
final List<String> externalHosts = config.externalHosts();
if (externalHosts != null) {
for (String externalHost : externalHosts) {
Expand All @@ -396,28 +398,33 @@ private Member createLocalMember(Address address) {
}

@Override
public Address address() {
return member().address();
public List<Address> addresses() {
return member().addresses();
}

@Override
public Mono<Void> send(Member member, Message message) {
return send(member.address(), message);
return TransportWrapper.send(transport, member.addresses(), message);
}

@Override
public Mono<Void> send(Address address, Message message) {
return transport.send(address, message);
}

@Override
public Mono<Void> send(List<Address> addresses, Message message) {
return TransportWrapper.send(transport, addresses, message);
}

@Override
public Mono<Message> requestResponse(Address address, Message request) {
return transport.requestResponse(address, request);
}

@Override
public Mono<Message> requestResponse(Member member, Message request) {
return transport.requestResponse(member.address(), request);
return TransportWrapper.requestResponse(transport, member.addresses(), request);
}

@Override
Expand Down Expand Up @@ -526,11 +533,11 @@ public Mono<Void> onShutdown() {
private static class SenderAwareTransport implements Transport {

private final Transport transport;
private final Address address;
private final List<Address> addresses;

private SenderAwareTransport(Transport transport, Address address) {
private SenderAwareTransport(Transport transport, List<Address> addresses) {
this.transport = Objects.requireNonNull(transport);
this.address = Objects.requireNonNull(address);
this.addresses = Objects.requireNonNull(addresses);
}

@Override
Expand Down Expand Up @@ -569,7 +576,7 @@ public Flux<Message> listen() {
}

private Message enhanceWithSender(Message message) {
return Message.with(message).sender(address).build();
return Message.with(message).sender(addresses).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -42,6 +43,8 @@ public final class FailureDetectorImpl implements FailureDetector {
private final Transport transport;
private final FailureDetectorConfig config;

private final TransportWrapper transportWrapper;

// State

private final List<Member> pingMembers = new ArrayList<>();
Expand Down Expand Up @@ -81,6 +84,8 @@ public FailureDetectorImpl(
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);

this.transportWrapper = new TransportWrapper(this.transport);

// Subscribe
actionsDisposables.addAll(
Arrays.asList(
Expand Down Expand Up @@ -145,9 +150,9 @@ private void doPing() {
Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build();

LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember);
Address address = pingMember.address();
transport
.requestResponse(address, pingMsg)
List<Address> addresses = pingMember.addresses();
transportWrapper
.requestResponse(addresses, pingMsg)
.timeout(Duration.ofMillis(config.pingTimeout()), scheduler)
.publishOn(scheduler)
.subscribe(
Expand Down Expand Up @@ -189,8 +194,8 @@ private void doPingReq(
Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout());
pingReqMembers.forEach(
member ->
transport
.requestResponse(member.address(), pingReqMsg)
transportWrapper
.requestResponse(member.addresses(), pingReqMsg)
.timeout(timeout, scheduler)
.publishOn(scheduler)
.subscribe(
Expand Down Expand Up @@ -232,7 +237,7 @@ private void onMessage(Message message) {
/** Listens to PING message and answers with ACK. */
private void onPing(Message message) {
long period = this.currentPeriod;
Address sender = message.sender();
List<Address> sender = message.sender();
LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender);
PingData data = message.data();
data = data.withAckType(AckType.DEST_OK);
Expand All @@ -249,18 +254,18 @@ private void onPing(Message message) {
String correlationId = message.correlationId();
Message ackMessage =
Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build();
Address address = data.getFrom().address();
LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address);
transport
.send(address, ackMessage)
List<Address> addresses = data.getFrom().addresses();
LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses);
transportWrapper
.send(addresses, ackMessage)
.subscribe(
null,
ex ->
LOGGER.debug(
"[{}][{}] Failed to send PingAck to {}, cause: {}",
localMember,
period,
address,
addresses,
ex.toString()));
}

Expand All @@ -275,18 +280,18 @@ private void onPingReq(Message message) {
PingData pingReqData = new PingData(localMember, target, originalIssuer);
Message pingMessage =
Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build();
Address address = target.address();
LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address);
transport
.send(address, pingMessage)
List<Address> addresses = target.addresses();
LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, addresses);
transportWrapper
.send(addresses, pingMessage)
.subscribe(
null,
ex ->
LOGGER.debug(
"[{}][{}] Failed to send transit Ping to {}, cause: {}",
localMember,
period,
address,
addresses,
ex.toString()));
}

Expand All @@ -305,18 +310,18 @@ private void onTransitPingAck(Message message) {
PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType);
Message originalAckMessage =
Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build();
Address address = target.address();
LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address);
transport
.send(address, originalAckMessage)
List<Address> addresses = target.addresses();
LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, addresses);
transportWrapper
.send(addresses, originalAckMessage)
.subscribe(
null,
ex ->
LOGGER.debug(
"[{}][{}] Failed to resend transit PingAck to {}, cause: {}",
localMember,
period,
address,
addresses,
ex.toString()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -287,14 +288,13 @@ private void spreadGossipsTo(long period, Member member) {
}

// Send gossip request
Address address = member.address();
List<Address> addresses = member.addresses();

gossips.stream()
.map(this::buildGossipRequestMessage)
.forEach(
message ->
transport
.send(address, message)
TransportWrapper.send(transport, addresses, message)
.subscribe(
null,
ex ->
Expand All @@ -303,7 +303,7 @@ private void spreadGossipsTo(long period, Member member) {
localMember,
period,
message,
address,
addresses,
ex.toString())));
}

Expand Down
Loading

0 comments on commit b757da6

Please sign in to comment.