diff --git a/pom.xml b/pom.xml index 6dc4f0f7c..455cee9cc 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -58,7 +60,6 @@ 2.6.17 - 1.0.24 1.0.32 2020.0.32 @@ -93,13 +94,6 @@ - - - io.scalecube - scalecube-commons - ${scalecube-commons.version} - - io.scalecube diff --git a/services-api/pom.xml b/services-api/pom.xml index 2762a2590..ada0a3ab5 100644 --- a/services-api/pom.xml +++ b/services-api/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -11,20 +13,11 @@ scalecube-services-api - - UTF-8 - - - - io.scalecube - scalecube-commons - io.projectreactor reactor-core - org.slf4j slf4j-api diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index 6b060b881..ba1b4822c 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -1,9 +1,9 @@ package io.scalecube.services.discovery; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointAdded; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointLeaving; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointRemoved; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.Cluster; import io.scalecube.cluster.ClusterConfig; @@ -19,6 +19,7 @@ import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.StringJoiner; import java.util.function.UnaryOperator; import org.slf4j.Logger; @@ -99,7 +100,7 @@ public Flux listen() { @Override public void shutdown() { - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); if (cluster != null) { cluster.shutdown(); } @@ -117,7 +118,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) { } LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent); - sink.emitNext(discoveryEvent, RETRY_NON_SERIALIZED); + sink.emitNext(discoveryEvent, busyLooping(Duration.ofSeconds(3))); } private ServiceDiscoveryEvent toServiceDiscoveryEvent(MembershipEvent membershipEvent) { diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java index 3363a7691..5f827604b 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport.http; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.scalecube.services.api.ServiceMessage; @@ -8,6 +8,7 @@ import io.scalecube.services.gateway.transport.GatewayClient; import io.scalecube.services.gateway.transport.GatewayClientCodec; import io.scalecube.services.gateway.transport.GatewayClientSettings; +import java.time.Duration; import java.util.function.BiFunction; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -87,7 +88,7 @@ private HttpGatewayClient( close .asMono() .then(doClose()) - .doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onClose.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources")) .subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex)); } @@ -128,7 +129,7 @@ public Flux requestChannel(Flux requests) { @Override public void close() { - close.emitEmpty(RETRY_NON_SERIALIZED); + close.emitEmpty(busyLooping(Duration.ofSeconds(3))); } @Override diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java index 8f93d6418..ff6afab2f 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport.websocket; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; @@ -101,7 +101,7 @@ private WebsocketGatewayClient( close .asMono() .then(doClose()) - .doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onClose.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .doOnTerminate(() -> LOGGER.info("Closed client")) .subscribe(null, ex -> LOGGER.warn("Failed to close client, cause: " + ex)); } @@ -143,7 +143,7 @@ public Flux requestChannel(Flux requests) { @Override public void close() { - close.emitEmpty(RETRY_NON_SERIALIZED); + close.emitEmpty(busyLooping(Duration.ofSeconds(3))); } @Override diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java index eb0da1dd3..f75a7c9e6 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport.websocket; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; @@ -9,6 +9,7 @@ import io.scalecube.services.gateway.ReferenceCountUtil; import io.scalecube.services.gateway.transport.GatewayClientCodec; import java.nio.channels.ClosedChannelException; +import java.time.Duration; import java.util.Map; import java.util.StringJoiner; import org.jctools.maps.NonBlockingHashMapLong; @@ -187,29 +188,29 @@ private void handleResponse(ServiceMessage response, Object processor) { private static void emitNext(Object processor, ServiceMessage message) { if (processor instanceof One) { //noinspection unchecked - ((One) processor).emitValue(message, RETRY_NON_SERIALIZED); + ((One) processor).emitValue(message, busyLooping(Duration.ofSeconds(3))); } if (processor instanceof Many) { //noinspection unchecked - ((Many) processor).emitNext(message, RETRY_NON_SERIALIZED); + ((Many) processor).emitNext(message, busyLooping(Duration.ofSeconds(3))); } } private static void emitComplete(Object processor) { if (processor instanceof One) { - ((One) processor).emitEmpty(RETRY_NON_SERIALIZED); + ((One) processor).emitEmpty(busyLooping(Duration.ofSeconds(3))); } if (processor instanceof Many) { - ((Many) processor).emitComplete(RETRY_NON_SERIALIZED); + ((Many) processor).emitComplete(busyLooping(Duration.ofSeconds(3))); } } private static void emitError(Object processor, Exception e) { if (processor instanceof One) { - ((One) processor).emitError(e, RETRY_NON_SERIALIZED); + ((One) processor).emitError(e, busyLooping(Duration.ofSeconds(3))); } if (processor instanceof Many) { - ((Many) processor).emitError(e, RETRY_NON_SERIALIZED); + ((Many) processor).emitError(e, busyLooping(Duration.ofSeconds(3))); } } diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java index d8e251e9a..fdc614cdc 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java @@ -1,6 +1,5 @@ package io.scalecube.services.transport.rsocket; -import io.scalecube.utils.MaskUtil; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -10,7 +9,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.StringJoiner; public final class ConnectionSetup implements Externalizable { @@ -45,13 +43,6 @@ public boolean hasCredentials() { return !credentials.isEmpty(); } - @Override - public String toString() { - return new StringJoiner(", ", ConnectionSetup.class.getSimpleName() + "[", "]") - .add("credentials=" + MaskUtil.mask(credentials)) - .toString(); - } - @Override public void writeExternal(ObjectOutput out) throws IOException { // credentials diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java index 99e97f95d..82445e511 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java @@ -18,7 +18,6 @@ import io.scalecube.services.transport.api.ClientTransport; import io.scalecube.services.transport.api.DataCodec; import io.scalecube.services.transport.api.HeadersCodec; -import io.scalecube.utils.MaskUtil; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -86,12 +85,6 @@ private Mono> getCredentials(ServiceReference serviceReferen return credentialsSupplier .apply(serviceReference) .switchIfEmpty(Mono.just(Collections.emptyMap())) - .doOnSuccess( - creds -> - LOGGER.debug( - "[credentialsSupplier] Got credentials ({}) for service: {}", - MaskUtil.mask(creds), - serviceReference)) .doOnError( ex -> LOGGER.error( diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index ce808145e..f739f8540 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -1,6 +1,6 @@ package io.scalecube.services; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.net.Address; import io.scalecube.services.api.ServiceMessage; @@ -26,6 +26,7 @@ import io.scalecube.services.transport.api.ServiceTransport; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -155,7 +156,7 @@ private Microservices(Builder builder) { shutdown .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onShutdown.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .subscribe( null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString())); } @@ -313,7 +314,7 @@ public Flux listenDiscovery() { public Mono shutdown() { return Mono.defer( () -> { - shutdown.emitEmpty(RETRY_NON_SERIALIZED); + shutdown.emitEmpty(busyLooping(Duration.ofSeconds(3))); return onShutdown.asMono(); }); } @@ -564,7 +565,7 @@ private Mono startListen() { .subscribeOn(scheduler) .publishOn(scheduler) .doOnNext(event -> onDiscoveryEvent(microservices, event)) - .doOnNext(event -> sink.emitNext(event, RETRY_NON_SERIALIZED)) + .doOnNext(event -> sink.emitNext(event, busyLooping(Duration.ofSeconds(3)))) .subscribe()); return Mono.fromRunnable(serviceDiscovery::start) @@ -601,7 +602,7 @@ private void onDiscoveryEvent(Microservices microservices, ServiceDiscoveryEvent public void close() { disposables.dispose(); - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); try { if (serviceDiscovery != null) {