Skip to content

Commit

Permalink
Merge pull request #716 from scalecube/make-jackson-optinal
Browse files Browse the repository at this point in the history
Add functionality for custom codecs installation
  • Loading branch information
segabriel authored Apr 25, 2020
2 parents f174477 + dbfd686 commit ae37734
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 39 deletions.
20 changes: 20 additions & 0 deletions services-api/src/main/java/io/scalecube/services/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class ServiceCall {
// no-op
};
private Map<String, String> credentials = Collections.emptyMap();
private String contentType;

/** Default constructor. */
public ServiceCall() {}
Expand All @@ -58,6 +60,9 @@ private ServiceCall(ServiceCall other) {
this.serviceRegistry = other.serviceRegistry;
this.router = other.router;
this.errorMapper = other.errorMapper;
this.contentType = other.contentType;
this.requestReleaser = other.requestReleaser;
this.credentials = new HashMap<>(other.credentials);
}

/**
Expand Down Expand Up @@ -156,6 +161,18 @@ public ServiceCall credentials(Map<String, String> credentials) {
return target;
}

/**
* Creates new {@link ServiceCall}'s definition with a given content type.
*
* @param contentType content type.
* @return new {@link ServiceCall} instance.
*/
public ServiceCall contentType(String contentType) {
ServiceCall target = new ServiceCall(this);
target.contentType = contentType;
return target;
}

/**
* Issues fire-and-forget request.
*
Expand Down Expand Up @@ -404,6 +421,7 @@ public Object invoke(Object proxy, Method method, Object[] params) {
case REQUEST_CHANNEL:
// this is REQUEST_CHANNEL so it means params[0] must
// be a publisher - its safe to cast.
//noinspection rawtypes
return serviceCall
.requestBidirectional(
Flux.from((Publisher) request)
Expand Down Expand Up @@ -434,13 +452,15 @@ private ServiceMessage toServiceMessage(MethodInfo methodInfo, Object request) {
return ServiceMessage.from((ServiceMessage) request)
.qualifier(methodInfo.serviceName(), methodInfo.methodName())
.headers(credentials)
.dataFormatIfAbsent(contentType)
.build();
}

return ServiceMessage.builder()
.qualifier(methodInfo.serviceName(), methodInfo.methodName())
.headers(credentials)
.data(request)
.dataFormatIfAbsent(contentType)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ public Builder dataFormat(String dataFormat) {
return this;
}

/**
* Setter for header {@link #HEADER_DATA_FORMAT}. Does nothing if input {@code dataFormat} is
* null or {@code headers} already contains value for {@link #HEADER_DATA_FORMAT}.
*
* @param dataFormat data format, optional
* @return self
*/
public Builder dataFormatIfAbsent(String dataFormat) {
if (dataFormat == null) {
return this;
}
headers.putIfAbsent(HEADER_DATA_FORMAT, dataFormat);
return this;
}

private Map<String, String> headers() {
return this.headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message, Consumer<Object> r
return authenticate(message)
.doOnError(th -> applyRequestReleaser(message, requestReleaser))
.flatMap(principal -> Mono.from(invoke(toRequest(message), principal)))
.map(this::toResponse)
.map(response -> toResponse(response, message.dataFormat()))
.onErrorResume(throwable -> Mono.just(errorMapper.toMessage(throwable)));
}

Expand All @@ -82,7 +82,7 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message, Consumer<Object>
return authenticate(message)
.doOnError(th -> applyRequestReleaser(message, requestReleaser))
.flatMapMany(principal -> Flux.from(invoke(toRequest(message), principal)))
.map(this::toResponse)
.map(response -> toResponse(response, message.dataFormat()))
.onErrorResume(throwable -> Flux.just(errorMapper.toMessage(throwable)));
}

Expand All @@ -104,8 +104,8 @@ public Flux<ServiceMessage> invokeBidirectional(
principal ->
messages
.map(this::toRequest)
.transform(request -> invoke(request, principal))))
.map(this::toResponse)
.transform(request -> invoke(request, principal)))
.map(response -> toResponse(response, first.get().dataFormat())))
.onErrorResume(throwable -> Flux.just(errorMapper.toMessage(throwable)));
}

Expand Down Expand Up @@ -187,10 +187,19 @@ private Object toRequest(ServiceMessage message) {
return methodInfo.isRequestTypeServiceMessage() ? request : request.data();
}

private ServiceMessage toResponse(Object response) {
return (response instanceof ServiceMessage)
? (ServiceMessage) response
: ServiceMessage.builder().qualifier(methodInfo.qualifier()).data(response).build();
private ServiceMessage toResponse(Object response, String dataFormat) {
if (response instanceof ServiceMessage) {
ServiceMessage message = (ServiceMessage) response;
if (dataFormat != null && !dataFormat.equals(message.dataFormat())) {
return ServiceMessage.from(message).dataFormat(dataFormat).build();
}
return message;
}
return ServiceMessage.builder()
.qualifier(methodInfo.qualifier())
.data(response)
.dataFormatIfAbsent(dataFormat)
.build();
}

private void applyRequestReleaser(ServiceMessage request, Consumer<Object> requestReleaser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.ByteArrayOutputStream;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.UUID;

public class SmCodecBenchmarkState extends BenchmarkState<SmCodecBenchmarkState> {
Expand All @@ -41,7 +42,8 @@ public SmCodecBenchmarkState(

@Override
protected void beforeAll() {
this.serviceMessageCodec = new ServiceMessageCodec(headersCodec);
this.serviceMessageCodec =
new ServiceMessageCodec(headersCodec, Collections.singletonList(dataCodec));
this.serviceMessage = generateServiceMessage(generateData());
this.payloadMessage = generatePayload(serviceMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.Map.Entry;

/** Simple binary codec for headers service message. */
public class DefaultHeadersCodec implements HeadersCodec {
public final class DefaultHeadersCodec implements HeadersCodec {

/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,8 @@ public final class ServiceMessageCodec {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceMessageCodec.class);

private final HeadersCodec headersCodec;

private final Map<String, DataCodec> dataCodecs;

/** Message codec with default Headers/Data Codecs. */
public ServiceMessageCodec() {
this(null, null);
}

/**
* Create instance from headersCodec and set of DataCodec.
*
Expand Down Expand Up @@ -68,17 +62,6 @@ public ServiceMessageCodec(
}
}

/**
* Construct messageCodec with custom headers codec and default data codecs.
*
* @param headersCodec headers codec
* @deprecated use {@link ServiceMessageCodec#ServiceMessageCodec(HeadersCodec, Collection)}
*/
@Deprecated
public ServiceMessageCodec(HeadersCodec headersCodec) {
this(headersCodec, null);
}

/**
* Encode a message, transform it to T.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.scalecube.services.examples.codecs;

import io.scalecube.net.Address;
import io.scalecube.services.Microservices;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl;
import io.scalecube.services.examples.helloworld.service.api.GreetingsService;
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;

public class Example1 {

public static final String CONTENT_TYPE = "application/protostuff";

/**
* Start the example.
*
* @param args ignored
*/
public static void main(String[] args) {
// ScaleCube Node node with no members
Microservices seed =
Microservices.builder()
.discovery(ScalecubeServiceDiscovery::new)
.transport(RSocketServiceTransport::new)
.contentType(CONTENT_TYPE) // need to send with non-default data format
.startAwait();

final Address seedAddress = seed.discovery().address();

// Construct a ScaleCube node which joins the cluster hosting the Greeting Service
Microservices ms =
Microservices.builder()
.discovery(
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(seedAddress)))
.transport(RSocketServiceTransport::new)
.services(new GreetingServiceImpl())
.startAwait();

// Create service proxy
GreetingsService service = seed.call().api(GreetingsService.class);

// Execute the services and subscribe to service events
service.sayHello("joe").subscribe(consumer -> System.out.println(consumer.message()));

seed.onShutdown().block();
ms.onShutdown().block();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import io.netty.util.concurrent.Future;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import io.scalecube.services.transport.api.ServiceTransport;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import reactor.core.publisher.Flux;
Expand All @@ -27,18 +29,16 @@
/** RSocket service transport. */
public class RSocketServiceTransport implements ServiceTransport {

private static final HeadersCodec HEADERS_CODEC = HeadersCodec.getInstance("application/json");
private static final int NUM_OF_WORKERS = Runtime.getRuntime().availableProcessors();

static {
Hooks.onNextDropped(
obj ->
ReferenceCountUtil.safestRelease(
obj instanceof ServiceMessage ? ((ServiceMessage) obj).data() : obj));
}

private int numOfWorkers = NUM_OF_WORKERS;
private HeadersCodec headersCodec = HEADERS_CODEC;
private int numOfWorkers = Runtime.getRuntime().availableProcessors();
private HeadersCodec headersCodec;
private Collection<DataCodec> dataCodecs;
private Function<LoopResources, TcpServer> tcpServerProvider = defaultTcpServerProvider();
private Function<LoopResources, TcpClient> tcpClientProvider = defaultTcpClientProvider();

Expand All @@ -58,13 +58,16 @@ public RSocketServiceTransport() {}
private RSocketServiceTransport(RSocketServiceTransport other) {
this.numOfWorkers = other.numOfWorkers;
this.headersCodec = other.headersCodec;
this.dataCodecs = other.dataCodecs;
this.eventLoopGroup = other.eventLoopGroup;
this.clientLoopResources = other.clientLoopResources;
this.serverLoopResources = other.serverLoopResources;
this.tcpServerProvider = other.tcpServerProvider;
this.tcpClientProvider = other.tcpClientProvider;
}

/**
* Sets a worker threads number.
* Setter for {@code numOfWorkers}.
*
* @param numOfWorkers number of worker threads
* @return new {@code RSocketServiceTransport} instance
Expand All @@ -76,7 +79,7 @@ public RSocketServiceTransport numOfWorkers(int numOfWorkers) {
}

/**
* Sets a {@code HeadersCodec}.
* Setter for {@code headersCodec}.
*
* @param headersCodec headers codec
* @return new {@code RSocketServiceTransport} instance
Expand All @@ -88,7 +91,19 @@ public RSocketServiceTransport headersCodec(HeadersCodec headersCodec) {
}

/**
* Sets a provider function for custom {@code TcpServer}.
* Setter for {@code dataCodecs}.
*
* @param dataCodecs set of data codecs
* @return new {@code RSocketServiceTransport} instance
*/
public RSocketServiceTransport dataCodecs(Collection<DataCodec> dataCodecs) {
RSocketServiceTransport rst = new RSocketServiceTransport(this);
rst.dataCodecs = dataCodecs;
return rst;
}

/**
* Setter for {@code tcpServerProvider}.
*
* @param factory {@code TcpServer} provider function
* @return new {@code RSocketServiceTransport} instance
Expand All @@ -100,7 +115,7 @@ public RSocketServiceTransport tcpServer(Function<LoopResources, TcpServer> fact
}

/**
* Sets a provider function for custom {@code TcpClient}.
* Setter for {@code tcpClientProvider}.
*
* @param factory {@code TcpClient} provider function
* @return new {@code RSocketServiceTransport} instance
Expand All @@ -119,7 +134,8 @@ public RSocketServiceTransport tcpClient(Function<LoopResources, TcpClient> fact
@Override
public ClientTransport clientTransport() {
return new RSocketClientTransport(
new ServiceMessageCodec(headersCodec), tcpClientProvider.apply(clientLoopResources));
new ServiceMessageCodec(headersCodec, dataCodecs),
tcpClientProvider.apply(clientLoopResources));
}

/**
Expand All @@ -130,7 +146,8 @@ public ClientTransport clientTransport() {
@Override
public ServerTransport serverTransport() {
return new RSocketServerTransport(
new ServiceMessageCodec(headersCodec), tcpServerProvider.apply(serverLoopResources));
new ServiceMessageCodec(headersCodec, dataCodecs),
tcpServerProvider.apply(serverLoopResources));
}

@Override
Expand Down Expand Up @@ -162,7 +179,7 @@ private EventLoopGroup newEventLoopGroup() {
}

private Mono<Void> shutdownEventLoopGroup() {
//noinspection unchecked
//noinspection unchecked,rawtypes
return Mono.defer(() -> FutureMono.from((Future) eventLoopGroup.shutdownGracefully()));
}

Expand Down
Loading

0 comments on commit ae37734

Please sign in to comment.