Skip to content

Commit

Permalink
Removed RetryNonSerializedEmitFailureHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 24, 2024
1 parent eda1057 commit d574611
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 56 deletions.
12 changes: 3 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -58,7 +60,6 @@

<properties>
<scalecube-cluster.version>2.6.17</scalecube-cluster.version>
<scalecube-commons.version>1.0.24</scalecube-commons.version>
<scalecube-security.version>1.0.32</scalecube-security.version>

<reactor.version>2020.0.32</reactor.version>
Expand Down Expand Up @@ -93,13 +94,6 @@

<dependencyManagement>
<dependencies>
<!-- Scalecube commons -->
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-commons</artifactId>
<version>${scalecube-commons.version}</version>
</dependency>

<!-- Scalecube security tokens -->
<dependency>
<groupId>io.scalecube</groupId>
Expand Down
13 changes: 3 additions & 10 deletions services-api/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

Expand All @@ -11,20 +13,11 @@

<artifactId>scalecube-services-api</artifactId>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-commons</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.scalecube.services.discovery;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointAdded;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointLeaving;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointRemoved;
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
Expand All @@ -19,6 +19,7 @@
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.StringJoiner;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,7 +100,7 @@ public Flux<ServiceDiscoveryEvent> listen() {

@Override
public void shutdown() {
sink.emitComplete(RETRY_NON_SERIALIZED);
sink.emitComplete(busyLooping(Duration.ofSeconds(3)));
if (cluster != null) {
cluster.shutdown();
}
Expand All @@ -117,7 +118,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
}

LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
sink.emitNext(discoveryEvent, RETRY_NON_SERIALIZED);
sink.emitNext(discoveryEvent, busyLooping(Duration.ofSeconds(3)));
}

private ServiceDiscoveryEvent toServiceDiscoveryEvent(MembershipEvent membershipEvent) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.scalecube.services.gateway.transport.http;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;

import io.netty.buffer.ByteBuf;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.api.ServiceMessage.Builder;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.time.Duration;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,7 +88,7 @@ private HttpGatewayClient(
close
.asMono()
.then(doClose())
.doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED))
.doFinally(s -> onClose.emitEmpty(busyLooping(Duration.ofSeconds(3))))
.doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources"))
.subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex));
}
Expand Down Expand Up @@ -128,7 +129,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {

@Override
public void close() {
close.emitEmpty(RETRY_NON_SERIALIZED);
close.emitEmpty(busyLooping(Duration.ofSeconds(3)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.scalecube.services.gateway.transport.websocket;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
Expand Down Expand Up @@ -101,7 +101,7 @@ private WebsocketGatewayClient(
close
.asMono()
.then(doClose())
.doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED))
.doFinally(s -> onClose.emitEmpty(busyLooping(Duration.ofSeconds(3))))
.doOnTerminate(() -> LOGGER.info("Closed client"))
.subscribe(null, ex -> LOGGER.warn("Failed to close client, cause: " + ex));
}
Expand Down Expand Up @@ -143,7 +143,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {

@Override
public void close() {
close.emitEmpty(RETRY_NON_SERIALIZED);
close.emitEmpty(busyLooping(Duration.ofSeconds(3)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.scalecube.services.gateway.transport.websocket;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
Expand All @@ -9,6 +9,7 @@
import io.scalecube.services.gateway.ReferenceCountUtil;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Map;
import java.util.StringJoiner;
import org.jctools.maps.NonBlockingHashMapLong;
Expand Down Expand Up @@ -187,29 +188,29 @@ private void handleResponse(ServiceMessage response, Object processor) {
private static void emitNext(Object processor, ServiceMessage message) {
if (processor instanceof One) {
//noinspection unchecked
((One<ServiceMessage>) processor).emitValue(message, RETRY_NON_SERIALIZED);
((One<ServiceMessage>) processor).emitValue(message, busyLooping(Duration.ofSeconds(3)));
}
if (processor instanceof Many) {
//noinspection unchecked
((Many<ServiceMessage>) processor).emitNext(message, RETRY_NON_SERIALIZED);
((Many<ServiceMessage>) processor).emitNext(message, busyLooping(Duration.ofSeconds(3)));
}
}

private static void emitComplete(Object processor) {
if (processor instanceof One) {
((One<?>) processor).emitEmpty(RETRY_NON_SERIALIZED);
((One<?>) processor).emitEmpty(busyLooping(Duration.ofSeconds(3)));
}
if (processor instanceof Many) {
((Many<?>) processor).emitComplete(RETRY_NON_SERIALIZED);
((Many<?>) processor).emitComplete(busyLooping(Duration.ofSeconds(3)));
}
}

private static void emitError(Object processor, Exception e) {
if (processor instanceof One) {
((One<?>) processor).emitError(e, RETRY_NON_SERIALIZED);
((One<?>) processor).emitError(e, busyLooping(Duration.ofSeconds(3)));
}
if (processor instanceof Many) {
((Many<?>) processor).emitError(e, RETRY_NON_SERIALIZED);
((Many<?>) processor).emitError(e, busyLooping(Duration.ofSeconds(3)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.scalecube.services.transport.rsocket;

import io.scalecube.utils.MaskUtil;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
Expand All @@ -10,7 +9,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.StringJoiner;

public final class ConnectionSetup implements Externalizable {

Expand Down Expand Up @@ -45,13 +43,6 @@ public boolean hasCredentials() {
return !credentials.isEmpty();
}

@Override
public String toString() {
return new StringJoiner(", ", ConnectionSetup.class.getSimpleName() + "[", "]")
.add("credentials=" + MaskUtil.mask(credentials))
.toString();
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
// credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.utils.MaskUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -86,12 +85,6 @@ private Mono<Map<String, String>> getCredentials(ServiceReference serviceReferen
return credentialsSupplier
.apply(serviceReference)
.switchIfEmpty(Mono.just(Collections.emptyMap()))
.doOnSuccess(
creds ->
LOGGER.debug(
"[credentialsSupplier] Got credentials ({}) for service: {}",
MaskUtil.mask(creds),
serviceReference))
.doOnError(
ex ->
LOGGER.error(
Expand Down
11 changes: 6 additions & 5 deletions services/src/main/java/io/scalecube/services/Microservices.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.scalecube.services;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;

import io.scalecube.net.Address;
import io.scalecube.services.api.ServiceMessage;
Expand All @@ -26,6 +26,7 @@
import io.scalecube.services.transport.api.ServiceTransport;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -155,7 +156,7 @@ private Microservices(Builder builder) {
shutdown
.asMono()
.then(doShutdown())
.doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED))
.doFinally(s -> onShutdown.emitEmpty(busyLooping(Duration.ofSeconds(3))))
.subscribe(
null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString()));
}
Expand Down Expand Up @@ -313,7 +314,7 @@ public Flux<ServiceDiscoveryEvent> listenDiscovery() {
public Mono<Void> shutdown() {
return Mono.defer(
() -> {
shutdown.emitEmpty(RETRY_NON_SERIALIZED);
shutdown.emitEmpty(busyLooping(Duration.ofSeconds(3)));
return onShutdown.asMono();
});
}
Expand Down Expand Up @@ -564,7 +565,7 @@ private Mono<Void> startListen() {
.subscribeOn(scheduler)
.publishOn(scheduler)
.doOnNext(event -> onDiscoveryEvent(microservices, event))
.doOnNext(event -> sink.emitNext(event, RETRY_NON_SERIALIZED))
.doOnNext(event -> sink.emitNext(event, busyLooping(Duration.ofSeconds(3))))
.subscribe());

return Mono.fromRunnable(serviceDiscovery::start)
Expand Down Expand Up @@ -601,7 +602,7 @@ private void onDiscoveryEvent(Microservices microservices, ServiceDiscoveryEvent
public void close() {
disposables.dispose();

sink.emitComplete(RETRY_NON_SERIALIZED);
sink.emitComplete(busyLooping(Duration.ofSeconds(3)));

try {
if (serviceDiscovery != null) {
Expand Down

0 comments on commit d574611

Please sign in to comment.