Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 21, 2023
1 parent bd22533 commit 884c42e
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 80 deletions.
68 changes: 28 additions & 40 deletions cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.cluster.transport.api.TransportConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -35,8 +38,7 @@ public final class ClusterConfig implements Cloneable {

private String memberId;
private String memberAlias;
private String externalHost;
private Integer externalPort;
private List<String> externalHosts;

private TransportConfig transportConfig = TransportConfig.defaultConfig();
private FailureDetectorConfig failureDetectorConfig = FailureDetectorConfig.defaultConfig();
Expand Down Expand Up @@ -136,27 +138,39 @@ public ClusterConfig metadataCodec(MetadataCodec metadataCodec) {
}

/**
* Returns externalHost. {@code externalHost} is a config property for container environments,
* it's being set for advertising to scalecube cluster some connectable hostname which maps to
* Returns externalHosts. {@code externalHosts} is a config property for container environments,
* it's being set for advertising to scalecube cluster some connectable hostnames which maps to
* scalecube transport's hostname on which scalecube transport is listening.
*
* @return external host
* @return external hosts
*/
public String externalHost() {
return externalHost;
public List<String> externalHosts() {
return externalHosts;
}

/**
* Setter for externalHost. {@code externalHost} is a config property for container environments,
* it's being set for advertising to scalecube cluster some connectable hostname which maps to
* scalecube transport's hostname on which scalecube transport is listening.
* Setter for externalHosts. {@code externalHosts} is a config property for container
* environments, it's being set for advertising to scalecube cluster some connectable hostnames
* which maps to scalecube transport's hostname on which scalecube transport is listening.
*
* @param externalHosts external hosts
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig externalHosts(String... externalHosts) {
return externalHosts(Arrays.asList(externalHosts));
}

/**
* Setter for externalHosts. {@code externalHosts} is a config property for container
* environments, it's being set for advertising to scalecube cluster some connectable hostnames
* which maps to scalecube transport's hostname on which scalecube transport is listening.
*
* @param externalHost external host
* @param externalHosts external hosts
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig externalHost(String externalHost) {
public ClusterConfig externalHosts(List<String> externalHosts) {
ClusterConfig c = clone();
c.externalHost = externalHost;
c.externalHosts = new ArrayList<>(externalHosts);
return c;
}

Expand Down Expand Up @@ -205,31 +219,6 @@ public ClusterConfig memberAlias(String memberAlias) {
return c;
}

/**
* Returns externalPort. {@code externalPort} is a config property for container environments,
* it's being set for advertising to scalecube cluster a port which mapped to scalecube
* transport's listening port.
*
* @return external port
*/
public Integer externalPort() {
return externalPort;
}

/**
* Setter for externalPort. {@code externalPort} is a config property for container environments,
* it's being set for advertising to scalecube cluster a port which mapped to scalecube
* transport's listening port.
*
* @param externalPort external port
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig externalPort(Integer externalPort) {
ClusterConfig c = clone();
c.externalPort = externalPort;
return c;
}

/**
* Applies {@link TransportConfig} settings.
*
Expand Down Expand Up @@ -316,8 +305,7 @@ public String toString() {
.add("metadataCodec=" + metadataCodec)
.add("memberId='" + memberId + "'")
.add("memberAlias='" + memberAlias + "'")
.add("externalHost='" + externalHost + "'")
.add("externalPort=" + externalPort)
.add("externalHosts=" + externalHosts)
.add("transportConfig=" + transportConfig)
.add("failureDetectorConfig=" + failureDetectorConfig)
.add("gossipConfig=" + gossipConfig)
Expand Down
49 changes: 29 additions & 20 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
Expand All @@ -20,24 +22,24 @@ public final class Member implements Externalizable {

private String id;
private String alias;
private Address address;
private List<Address> addresses;
private String namespace;

public Member() {}

/**
* Constructor.
*
* @param id member id; not null
* @param id member id
* @param alias member alias (optional)
* @param address member address; not null
* @param namespace namespace; not null
* @param addresses member addresses
* @param namespace namespace
*/
public Member(String id, String alias, Address address, String namespace) {
this.id = Objects.requireNonNull(id, "member id");
this.alias = alias; // optional
this.address = Objects.requireNonNull(address, "member address");
this.namespace = Objects.requireNonNull(namespace, "member namespace");
public Member(String id, String alias, List<Address> addresses, String namespace) {
this.id = Objects.requireNonNull(id, "id");
this.alias = alias;
this.addresses = Objects.requireNonNull(addresses, "addresses");
this.namespace = Objects.requireNonNull(namespace, "namespace");
}

/**
Expand Down Expand Up @@ -70,14 +72,14 @@ public String namespace() {
}

/**
* Returns cluster member address, an address on which this cluster member listens connections
* from other cluster members.
* Returns cluster member addresses, those are addresses on which this cluster member listens
* connections from other cluster members.
*
* @see io.scalecube.cluster.transport.api.TransportConfig#port(int)
* @return member address
*/
public Address address() {
return address;
public List<Address> addresses() {
return addresses;
}

@Override
Expand All @@ -90,13 +92,13 @@ public boolean equals(Object that) {
}
Member member = (Member) that;
return Objects.equals(id, member.id)
&& Objects.equals(address, member.address)
&& Objects.equals(addresses, member.addresses)
&& Objects.equals(namespace, member.namespace);
}

@Override
public int hashCode() {
return Objects.hash(id, address, namespace);
return Objects.hash(id, addresses, namespace);
}

@Override
Expand All @@ -110,7 +112,10 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(alias);
}
// address
out.writeUTF(address.toString());
out.writeInt(addresses.size());
for (Address address : addresses) {
out.writeUTF(address.toString());
}
// namespace
out.writeUTF(namespace);
}
Expand All @@ -124,8 +129,12 @@ public void readExternal(ObjectInput in) throws IOException {
if (aliasNotNull) {
alias = in.readUTF();
}
// address
address = Address.from(in.readUTF());
// addresses
final int addressesSize = in.readInt();
addresses = new ArrayList<>(addressesSize);
for (int i = 0; i < addressesSize; i++) {
addresses.add(Address.from(in.readUTF()));
}
// namespace
this.namespace = in.readUTF();
}
Expand All @@ -143,9 +152,9 @@ private static String stringifyId(String id) {
public String toString() {
StringJoiner stringJoiner = new StringJoiner(":");
if (alias == null) {
return stringJoiner.add(namespace).add(stringifyId(id) + "@" + address).toString();
return stringJoiner.add(namespace).add(stringifyId(id)).toString();
} else {
return stringJoiner.add(namespace).add(alias).add(stringifyId(id) + "@" + address).toString();
return stringJoiner.add(namespace).add(alias).add(stringifyId(id)).toString();
}
}
}
39 changes: 25 additions & 14 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import io.scalecube.utils.ServiceLoaderUtil;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -243,7 +245,9 @@ private Mono<Cluster> doStart0() {
localMember = createLocalMember(boundTransport.address());
transport = new SenderAwareTransport(boundTransport, localMember.address());

scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
final String name =
"sc-cluster-" + Integer.toHexString(System.identityHashCode(this));
scheduler = Schedulers.newSingle(name, true);

failureDetector =
new FailureDetectorImpl(
Expand Down Expand Up @@ -369,19 +373,26 @@ private Flux<MembershipEvent> listenMembership() {
* @return local cluster member with cluster address and cluster member id
*/
private Member createLocalMember(Address address) {
int port = Optional.ofNullable(config.externalPort()).orElse(address.port());

// calculate local member cluster address
Address memberAddress =
Optional.ofNullable(config.externalHost())
.map(host -> Address.create(host, port))
.orElseGet(() -> Address.create(address.host(), port));

return new Member(
config.memberId() != null ? config.memberId() : UUID.randomUUID().toString(),
config.memberAlias(),
memberAddress,
config.membershipConfig().namespace());
final int port = address.port();
final List<Address> memberAddresses = new ArrayList<>();

// First address comes as "fair" listen address
memberAddresses.add(address);

// Tail goes as externalHosts, if the exist
final List<String> externalHosts = config.externalHosts();
if (externalHosts != null) {
for (String externalHost : externalHosts) {
memberAddresses.add(Address.create(externalHost, port));
}
}

final String memberId =
config.memberId() != null ? config.memberId() : UUID.randomUUID().toString();
final String memberAlias = config.memberAlias();
final String namespace = config.membershipConfig().namespace();

return new Member(memberId, memberAlias, memberAddresses, namespace);
}

@Override
Expand Down
11 changes: 5 additions & 6 deletions cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException {

// Start seed node
Cluster seedNode =
new ClusterImpl(new ClusterConfig().externalHost("localhost").externalPort(7878))
new ClusterImpl(new ClusterConfig().externalHosts("localhost"))
.transport(opts -> opts.port(7878).connectTimeout(CONNECT_TIMEOUT))
.membership(opts -> opts.seedMembers(addresses))
.transportFactory(TcpTransportFactory::new)
Expand Down Expand Up @@ -590,14 +590,13 @@ private void shutdown(List<Cluster> nodes) {

@Test
public void testExplicitLocalMemberId() {
ClusterConfig config = ClusterConfig.defaultConfig()
.memberId("test-member");
ClusterConfig config = ClusterConfig.defaultConfig().memberId("test-member");

ClusterImpl cluster = null;
try {
cluster = (ClusterImpl) new ClusterImpl(config)
.transportFactory(TcpTransportFactory::new)
.startAwait();
cluster =
(ClusterImpl)
new ClusterImpl(config).transportFactory(TcpTransportFactory::new).startAwait();

assertEquals("test-member", cluster.member().id());
} finally {
Expand Down

0 comments on commit 884c42e

Please sign in to comment.