Skip to content

Commit

Permalink
Merge pull request #702 from scalecube/update/new-configuration-metho…
Browse files Browse the repository at this point in the history
…ds-in-sc-discovery

Added more ScalecubeServiceDiscovery config fucntions
  • Loading branch information
artem-v authored Jan 18, 2020
2 parents c6454eb + 16918ad commit ff6abf3
Show file tree
Hide file tree
Showing 21 changed files with 183 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public void beforeAll() {
.transport(RSocketServiceTransport::new)
.startAwait();

Address seedAddress = seed.discovery().address();
final Address seedAddress = seed.discovery().address();

node =
Microservices.builder()
.metrics(registry())
.discovery(
serviceEndpoint ->
new ScalecubeServiceDiscovery(serviceEndpoint)
.options(opts -> opts.membership(cfg -> cfg.seedMembers(seedAddress))))
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(seedAddress)))
.transport(RSocketServiceTransport::new)
.services(services)
.startAwait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.discovery.api.ServiceDiscovery;
Expand Down Expand Up @@ -32,10 +36,8 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
private static final Logger LOGGER =
LoggerFactory.getLogger("io.scalecube.services.discovery.ServiceDiscovery");

private final ServiceEndpoint serviceEndpoint;

private ServiceEndpoint serviceEndpoint;
private ClusterConfig clusterConfig;

private Cluster cluster;

private final DirectProcessor<ServiceDiscoveryEvent> subject = DirectProcessor.create();
Expand Down Expand Up @@ -63,9 +65,9 @@ private ScalecubeServiceDiscovery(ScalecubeServiceDiscovery other) {
}

/**
* Setter for {@code ClusterConfig.Builder} options.
* Setter for {@code ClusterConfig} options.
*
* @param opts ClusterConfig options builder
* @param opts options operator
* @return new instance of {@code ScalecubeServiceDiscovery}
*/
public ScalecubeServiceDiscovery options(UnaryOperator<ClusterConfig> opts) {
Expand All @@ -74,6 +76,46 @@ public ScalecubeServiceDiscovery options(UnaryOperator<ClusterConfig> opts) {
return d;
}

/**
* Setter for {@code TransportConfig} options.
*
* @param opts options operator
* @return new instance of {@code ScalecubeServiceDiscovery}
*/
public ScalecubeServiceDiscovery transport(UnaryOperator<TransportConfig> opts) {
return options(cfg -> cfg.transport(opts));
}

/**
* Setter for {@code MembershipConfig} options.
*
* @param opts options operator
* @return new instance of {@code ScalecubeServiceDiscovery}
*/
public ScalecubeServiceDiscovery membership(UnaryOperator<MembershipConfig> opts) {
return options(cfg -> cfg.membership(opts));
}

/**
* Setter for {@code GossipConfig} options.
*
* @param opts options operator
* @return new instance of {@code ScalecubeServiceDiscovery}
*/
public ScalecubeServiceDiscovery gossip(UnaryOperator<GossipConfig> opts) {
return options(cfg -> cfg.gossip(opts));
}

/**
* Setter for {@code FailureDetectorConfig} options.
*
* @param opts options operator
* @return new instance of {@code ScalecubeServiceDiscovery}
*/
public ScalecubeServiceDiscovery failureDetector(UnaryOperator<FailureDetectorConfig> opts) {
return options(cfg -> cfg.failureDetector(opts));
}

@Override
public Address address() {
return cluster.address();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,19 @@ public static ServiceEndpoint newServiceEndpoint() {
private Mono<ServiceDiscovery> newServiceDiscovery(
Address seedAddress, MetadataCodec metadataCodec) {
return Mono.fromCallable(
() -> {
ServiceEndpoint serviceEndpoint = newServiceEndpoint();
return new ScalecubeServiceDiscovery(serviceEndpoint)
.options(opts -> opts.metadataCodec(metadataCodec))
.options(opts -> opts.gossip(cfg -> GOSSIP_CONFIG))
.options(opts -> opts.membership(cfg -> MEMBERSHIP_CONFIG))
.options(opts -> opts.membership(cfg -> cfg.seedMembers(seedAddress)));
});
() ->
new ScalecubeServiceDiscovery(newServiceEndpoint())
.options(opts -> opts.metadataCodec(metadataCodec))
.gossip(cfg -> GOSSIP_CONFIG)
.membership(cfg -> MEMBERSHIP_CONFIG)
.membership(cfg -> cfg.seedMembers(seedAddress)));
}

private Address startSeed(MetadataCodec metadataCodec) {
return new ScalecubeServiceDiscovery(newServiceEndpoint())
.options(opts -> opts.metadataCodec(metadataCodec))
.options(opts -> opts.gossip(cfg -> GOSSIP_CONFIG))
.options(opts -> opts.membership(cfg -> MEMBERSHIP_CONFIG))
.gossip(cfg -> GOSSIP_CONFIG)
.membership(cfg -> MEMBERSHIP_CONFIG)
.start()
.block()
.address();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ public class ExamplesRunner {
* Main method of gateway runner.
*
* @param args program arguments
* @throws Exception exception thrown
*/
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
ConfigRegistry configRegistry = ConfigBootstrap.configRegistry();

Config config =
Expand All @@ -53,7 +52,7 @@ public static void main(String[] args) throws Exception {
LOGGER.info("Number of worker threads: " + numOfThreads);

Microservices.builder()
.discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, config))
.discovery(endpoint -> serviceDiscovery(endpoint, config))
.transport(
() ->
new RSocketServiceTransport()
Expand All @@ -77,14 +76,10 @@ public static void main(String[] args) throws Exception {
.block();
}

private static ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint, Config config) {
return new ScalecubeServiceDiscovery(serviceEndpoint)
.options(
opts ->
opts.membership(cfg1 -> cfg1.seedMembers(config.seedAddresses()))
.transport(cfg1 -> cfg1.port(config.discoveryPort()))
.memberHost(config.memberHost())
.memberPort(config.memberPort()));
private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint, Config config) {
return new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(config.seedAddresses()))
.transport(cfg -> cfg.port(config.discoveryPort()));
}

public static class Config {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.scalecube.services.examples.exceptions;

import io.scalecube.net.Address;
import io.scalecube.services.Microservices;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.ServiceInfo;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
import java.util.Collections;

Expand All @@ -30,9 +29,14 @@ public static void main(String[] args) throws InterruptedException {

System.err.println("ms1 started: " + ms1.serviceAddress());

final Address address1 = ms1.discovery().address();

Microservices ms2 =
Microservices.builder()
.discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, ms1))
.discovery(
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(address1)))
.transport(RSocketServiceTransport::new)
.services(
call -> {
Expand Down Expand Up @@ -63,10 +67,4 @@ public static void main(String[] args) throws InterruptedException {

Thread.currentThread().join();
}

private static ServiceDiscovery serviceDiscovery(
ServiceEndpoint serviceEndpoint, Microservices ms1) {
return new ScalecubeServiceDiscovery(serviceEndpoint)
.options(opts -> opts.membership(cfg -> cfg.seedMembers(ms1.discovery().address())));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.scalecube.services.examples.helloworld;

import io.scalecube.net.Address;
import io.scalecube.services.Microservices;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl;
Expand Down Expand Up @@ -29,16 +30,15 @@ public static void main(String[] args) {
.transport(RSocketServiceTransport::new)
.startAwait();

final Address seedAddress = seed.discovery().address();

// Construct a ScaleCube node which joins the cluster hosting the Greeting Service
Microservices ms =
Microservices.builder()
.discovery(
serviceEndpoint ->
new ScalecubeServiceDiscovery(serviceEndpoint)
.options(
opts ->
opts.membership(
cfg -> cfg.seedMembers(seed.discovery().address()))))
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(seedAddress)))
.transport(RSocketServiceTransport::new)
.services(new GreetingServiceImpl())
.startAwait();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package io.scalecube.services.examples.helloworld;

import io.scalecube.net.Address;
import io.scalecube.services.Microservices;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl;
import io.scalecube.services.examples.helloworld.service.api.Greeting;
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
Expand Down Expand Up @@ -40,9 +39,14 @@ public static void main(String[] args) {
.startAwait();

// Construct a ScaleCube node which joins the cluster hosting the Greeting Service
final Address seedAddress = seed.discovery().address();

Microservices ms =
Microservices.builder()
.discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, seed))
.discovery(
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(seedAddress)))
.transport(RSocketServiceTransport::new)
.services(new GreetingServiceImpl())
.startAwait();
Expand All @@ -68,10 +72,4 @@ public static void main(String[] args) {
seed.onShutdown().block();
ms.onShutdown().block();
}

private static ServiceDiscovery serviceDiscovery(
ServiceEndpoint serviceEndpoint, Microservices seed) {
return new ScalecubeServiceDiscovery(serviceEndpoint)
.options(opts -> opts.membership(cfg -> cfg.seedMembers(seed.discovery().address())));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.scalecube.services.examples.helloworld;

import io.scalecube.net.Address;
import io.scalecube.services.Microservices;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.examples.helloworld.service.BidiGreetingImpl;
Expand Down Expand Up @@ -30,16 +31,15 @@ public static void main(String[] args) {
.transport(RSocketServiceTransport::new)
.startAwait();

final Address seedAddress = seed.discovery().address();

// Construct a ScaleCube node which joins the cluster hosting the Greeting Service
Microservices ms =
Microservices.builder()
.discovery(
serviceEndpoint ->
new ScalecubeServiceDiscovery(serviceEndpoint)
.options(
opts ->
opts.membership(
cfg -> cfg.seedMembers(seed.discovery().address()))))
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(seedAddress)))
.transport(RSocketServiceTransport::new)
.services(new BidiGreetingImpl())
.startAwait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import io.scalecube.net.Address;
import io.scalecube.services.Microservices;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.examples.orderbook.service.DefaultMarketDataService;
import io.scalecube.services.examples.orderbook.service.OrderBookSnapshoot;
import io.scalecube.services.examples.orderbook.service.OrderRequest;
Expand Down Expand Up @@ -40,10 +38,14 @@ public static void main(String[] args) throws InterruptedException {
.transport(RSocketServiceTransport::new)
.startAwait();

final Address gatewayAddress = gateway.discovery().address();

Microservices ms =
Microservices.builder()
.discovery(
serviceEndpoint -> serviceDiscovery(serviceEndpoint, gateway.discovery().address()))
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(gatewayAddress)))
.transport(RSocketServiceTransport::new)
.services(new DefaultMarketDataService())
.startAwait();
Expand All @@ -64,7 +66,7 @@ public static void main(String[] args) throws InterruptedException {
new Order(
new PriceLevel(Side.BUY, RANDOM.nextInt(10) + 1), // prices
System.currentTimeMillis(),
Long.valueOf(RANDOM.nextInt(110) + 1 + "")), // units
Long.parseLong(RANDOM.nextInt(110) + 1 + "")), // units
INSTRUMENT))
.block();
} else {
Expand All @@ -74,7 +76,7 @@ public static void main(String[] args) throws InterruptedException {
new Order(
new PriceLevel(Side.SELL, RANDOM.nextInt(10) + 1), // prices
System.currentTimeMillis(),
Long.valueOf(RANDOM.nextInt(70) + 1 + "")), // units
Long.parseLong(RANDOM.nextInt(70) + 1 + "")), // units
INSTRUMENT))
.block();
}
Expand All @@ -89,12 +91,6 @@ public static void main(String[] args) throws InterruptedException {
Thread.currentThread().join();
}

private static ServiceDiscovery serviceDiscovery(
ServiceEndpoint serviceEndpoint, Address address) {
return new ScalecubeServiceDiscovery(serviceEndpoint)
.options(opts -> opts.membership(cfg -> cfg.seedMembers(address)));
}

private static void print(OrderBookSnapshoot snapshot) {

System.out.println("====== Asks ========");
Expand Down
Loading

0 comments on commit ff6abf3

Please sign in to comment.