Skip to content

Commit

Permalink
Merge pull request #763 from scalecube/update-cluster
Browse files Browse the repository at this point in the history
Added test, updated pom, added null validation to ServiceEndpoint
  • Loading branch information
artem-v authored Jun 9, 2020
2 parents b111898 + ce4c1cb commit bf27bbf
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

<properties>
<jackson.version>2.11.0</jackson.version>
<scalecube-cluster.version>2.5.0-RC2</scalecube-cluster.version>
<scalecube-cluster.version>2.5.0</scalecube-cluster.version>
<scalecube-commons.version>1.0.4</scalecube-commons.version>
<scalecube-benchmarks.version>1.2.2</scalecube-benchmarks.version>
<scalecube-config.version>0.4.3</scalecube-config.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ServiceEndpoint() {}

private ServiceEndpoint(Builder builder) {
this.id = Objects.requireNonNull(builder.id, "ServiceEndpoint.id is required");
this.address = builder.address;
this.address = Objects.requireNonNull(builder.address, "ServiceEndpoint.address is required");
this.contentTypes = Collections.unmodifiableSet(new HashSet<>(builder.contentTypes));
this.tags = Collections.unmodifiableMap(new HashMap<>(builder.tags));
this.serviceRegistrations =
Expand Down Expand Up @@ -96,11 +96,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(id);

// address
boolean addressExists = address != null;
out.writeBoolean(addressExists);
if (addressExists) {
out.writeUTF(address.toString());
}
out.writeUTF(address.toString());

// contentTypes
out.writeInt(contentTypes.size());
Expand Down Expand Up @@ -128,10 +124,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
id = in.readUTF();

// address
boolean addressExists = in.readBoolean();
if (addressExists) {
address = Address.from(in.readUTF());
}
address = Address.from(in.readUTF());

// contentTypes
int contentTypesSize = in.readInt();
Expand Down Expand Up @@ -163,41 +156,55 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
public static class Builder {

private String id;
private Address address;
private Address address = Address.NULL_ADDRESS;
private Set<String> contentTypes = Collections.emptySet();
private Map<String, String> tags = Collections.emptyMap();
private Collection<ServiceRegistration> serviceRegistrations = new ArrayList<>();

private Builder() {}

public Builder id(String id) {
this.id = id;
this.id = Objects.requireNonNull(id, "id");
return this;
}

public Builder address(Address address) {
this.address = address;
this.address = Objects.requireNonNull(address, "address");
return this;
}

public Builder contentTypes(Set<String> contentTypes) {
this.contentTypes = contentTypes;
this.contentTypes = Objects.requireNonNull(contentTypes, "contentTypes");
return this;
}

public Builder tags(Map<String, String> tags) {
this.tags = tags;
this.tags = Objects.requireNonNull(tags, "tags");
return this;
}

/**
* Adds {@code serviceRegistrations} to collection of {@code serviceRegistrations}.
*
* @param serviceRegistrations serviceRegistrations
* @return this builder
*/
public Builder appendServiceRegistrations(
Collection<ServiceRegistration> serviceRegistrations) {
this.serviceRegistrations.addAll(serviceRegistrations);
this.serviceRegistrations.addAll(
Objects.requireNonNull(serviceRegistrations, "serviceRegistrations"));
return this;
}

/**
* Setter for {@code serviceRegistrations}.
*
* @param serviceRegistrations serviceRegistrations
* @return this builder
*/
public Builder serviceRegistrations(Collection<ServiceRegistration> serviceRegistrations) {
this.serviceRegistrations = serviceRegistrations;
this.serviceRegistrations =
Objects.requireNonNull(serviceRegistrations, "serviceRegistrations");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.scalecube.services.discovery.api;

import io.scalecube.services.ServiceEndpoint;
import java.util.Optional;
import java.util.Objects;
import java.util.StringJoiner;

public class ServiceDiscoveryEvent {
Expand All @@ -18,12 +18,13 @@ public enum Type {
/**
* Constructor.
*
* @param type type
* @param serviceEndpoint service endpoint
* @param type type; not null
* @param serviceEndpoint service endpoint; not null
*/
private ServiceDiscoveryEvent(Type type, ServiceEndpoint serviceEndpoint) {
this.type = type;
this.serviceEndpoint = serviceEndpoint;
this.type = Objects.requireNonNull(type, "ServiceDiscoveryEvent: type");
this.serviceEndpoint =
Objects.requireNonNull(serviceEndpoint, "ServiceDiscoveryEvent: serviceEndpoint");
}

public static ServiceDiscoveryEvent newEndpointAdded(ServiceEndpoint serviceEndpoint) {
Expand Down Expand Up @@ -62,9 +63,7 @@ public boolean isEndpointRemoved() {
public String toString() {
return new StringJoiner(", ", ServiceDiscoveryEvent.class.getSimpleName() + "[", "]")
.add("type=" + type)
.add(
"serviceEndpoint="
+ Optional.ofNullable(serviceEndpoint).map(ServiceEndpoint::id).orElse(null))
.add("ServiceEndpoint.id='" + serviceEndpoint.id() + "'")
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.scalecube.services.discovery;

import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointAdded;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointLeaving;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointRemoved;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
Expand All @@ -16,6 +20,7 @@
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.UnaryOperator;
Expand All @@ -35,7 +40,8 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

private ServiceEndpoint serviceEndpoint;
private final ServiceEndpoint serviceEndpoint;

private ClusterConfig clusterConfig;
private Cluster cluster;

Expand All @@ -48,8 +54,11 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
* @param serviceEndpoint service endpoint
*/
public ScalecubeServiceDiscovery(ServiceEndpoint serviceEndpoint) {
this.serviceEndpoint = serviceEndpoint;
this.clusterConfig = ClusterConfig.defaultLanConfig().metadata(serviceEndpoint);
this.serviceEndpoint = Objects.requireNonNull(serviceEndpoint, "serviceEndpoint");
this.clusterConfig =
ClusterConfig.defaultLanConfig()
.memberIdGenerator(serviceEndpoint::id)
.metadata(serviceEndpoint);
}

/**
Expand Down Expand Up @@ -177,7 +186,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
ServiceDiscoveryEvent discoveryEvent = toServiceDiscoveryEvent(membershipEvent);
if (discoveryEvent == null) {
LOGGER.warn(
"Not publishing discoveryEvent, discoveryEvent is null, membershipEvent: {}",
"DiscoveryEvent is null, cannot publish it (corresponding membershipEvent: {})",
membershipEvent);
return;
}
Expand All @@ -192,18 +201,13 @@ private ServiceDiscoveryEvent toServiceDiscoveryEvent(MembershipEvent membership
ServiceDiscoveryEvent discoveryEvent = null;

if (membershipEvent.isAdded() && membershipEvent.newMetadata() != null) {
discoveryEvent =
ServiceDiscoveryEvent.newEndpointAdded(decodeMetadata(membershipEvent.newMetadata()));
discoveryEvent = newEndpointAdded(decodeMetadata(membershipEvent.newMetadata()));
}

if (membershipEvent.isRemoved() && membershipEvent.oldMetadata() != null) {
discoveryEvent =
ServiceDiscoveryEvent.newEndpointRemoved(decodeMetadata(membershipEvent.oldMetadata()));
discoveryEvent = newEndpointRemoved(decodeMetadata(membershipEvent.oldMetadata()));
}

if (membershipEvent.isLeaving() && membershipEvent.newMetadata() != null) {
discoveryEvent =
ServiceDiscoveryEvent.newEndpointLeaving(decodeMetadata(membershipEvent.newMetadata()));
discoveryEvent = newEndpointLeaving(decodeMetadata(membershipEvent.newMetadata()));
}

return discoveryEvent;
Expand Down
64 changes: 49 additions & 15 deletions services/src/main/java/io/scalecube/services/Microservices.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ public String id() {
}

private static String generateId() {
return Long.toHexString(UUID.randomUUID().getMostSignificantBits() & Long.MAX_VALUE);
UUID uuid = UUID.randomUUID();
return Long.toHexString(uuid.getMostSignificantBits())
+ Long.toHexString(uuid.getLeastSignificantBits());
}

@Override
Expand Down Expand Up @@ -205,6 +207,11 @@ private Mono<Microservices> start() {
.map(ServiceInfo::serviceInstance)
.collect(Collectors.toList());

if (transportBootstrap == ServiceTransportBootstrap.NULL_INSTANCE
&& !serviceInstances.isEmpty()) {
LOGGER.warn("[{}] ServiceTransport is not set", this.id());
}

return discoveryBootstrap
.createInstance(serviceEndpointBuilder.build())
.publishOn(scheduler)
Expand Down Expand Up @@ -312,13 +319,13 @@ private Mono<Void> processBeforeDestroy() {
public static final class Builder {

private Map<String, String> tags = new HashMap<>();
private List<ServiceProvider> serviceProviders = new ArrayList<>();
private final List<ServiceProvider> serviceProviders = new ArrayList<>();
private ServiceRegistry serviceRegistry = new ServiceRegistryImpl();
private ServiceMethodRegistry methodRegistry = new ServiceMethodRegistryImpl();
private Authenticator<Object> authenticator = new DelegatingAuthenticator();
private ServiceDiscoveryBootstrap discoveryBootstrap = new ServiceDiscoveryBootstrap();
private ServiceTransportBootstrap transportBootstrap = new ServiceTransportBootstrap();
private GatewayBootstrap gatewayBootstrap = new GatewayBootstrap();
private final GatewayBootstrap gatewayBootstrap = new GatewayBootstrap();
private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
private ServiceMessageDataDecoder dataDecoder =
Optional.ofNullable(ServiceMessageDataDecoder.INSTANCE)
Expand Down Expand Up @@ -487,33 +494,28 @@ public <A, T> Builder defaultPrincipalMapper(

public static class ServiceDiscoveryBootstrap {

public static final Function<ServiceEndpoint, ServiceDiscovery> NULL_FACTORY = i -> null;

private final Function<ServiceEndpoint, ServiceDiscovery> factory;

private ServiceDiscovery discovery;
private Disposable disposable;

private ServiceDiscoveryBootstrap() {
this(NULL_FACTORY);
this(NullServiceDiscovery::new);
}

private ServiceDiscoveryBootstrap(Function<ServiceEndpoint, ServiceDiscovery> factory) {
this.factory = factory;
}

private Mono<ServiceDiscovery> createInstance(ServiceEndpoint serviceEndpoint) {
return factory == NULL_FACTORY
? Mono.empty()
: Mono.defer(() -> Mono.just(discovery = factory.apply(serviceEndpoint)));
return Mono.fromCallable(() -> discovery = factory.apply(serviceEndpoint));
}

private Mono<ServiceDiscovery> startListen(Microservices microservices) {
return Mono.defer(
() -> {
if (discovery == null) {
LOGGER.info("[{}] ServiceDiscovery not set", microservices.id());
return Mono.empty();
if (discovery instanceof NullServiceDiscovery) {
return Mono.just(discovery);
}

disposable =
Expand Down Expand Up @@ -561,6 +563,40 @@ private Mono<Void> shutdown() {
}
}

private static class NullServiceDiscovery implements ServiceDiscovery {

private final ServiceEndpoint serviceEndpoint;

private NullServiceDiscovery(ServiceEndpoint serviceEndpoint) {
this.serviceEndpoint = serviceEndpoint;
}

@Override
public Address address() {
return Address.NULL_ADDRESS;
}

@Override
public ServiceEndpoint serviceEndpoint() {
return serviceEndpoint;
}

@Override
public Flux<ServiceDiscoveryEvent> listenDiscovery() {
return Flux.never();
}

@Override
public Mono<ServiceDiscovery> start() {
return Mono.just(this);
}

@Override
public Mono<Void> shutdown() {
return Mono.empty();
}
}

private static class GatewayBootstrap {

private final List<Function<GatewayOptions, Gateway>> factories = new ArrayList<>();
Expand Down Expand Up @@ -623,14 +659,13 @@ public static class ServiceTransportBootstrap {

public static final Supplier<ServiceTransport> NULL_SUPPLIER = () -> null;
public static final ServiceTransportBootstrap NULL_INSTANCE = new ServiceTransportBootstrap();
public static final Address NULL_ADDRESS = Address.create("0.0.0.0", 0);

private final Supplier<ServiceTransport> transportSupplier;

private ServiceTransport serviceTransport;
private ClientTransport clientTransport;
private ServerTransport serverTransport;
private Address transportAddress = NULL_ADDRESS;
private Address transportAddress = Address.NULL_ADDRESS;

public ServiceTransportBootstrap() {
this(NULL_SUPPLIER);
Expand All @@ -643,7 +678,6 @@ public ServiceTransportBootstrap(Supplier<ServiceTransport> transportSupplier) {
private Mono<ServiceTransportBootstrap> start(Microservices microservices) {
if (transportSupplier == NULL_SUPPLIER
|| (serviceTransport = transportSupplier.get()) == null) {
LOGGER.info("[{}] ServiceTransport not set", microservices.id());
return Mono.just(NULL_INSTANCE);
}

Expand Down
Loading

0 comments on commit bf27bbf

Please sign in to comment.