Skip to content

Commit 008a106

Browse files
committed
Made tests work
1 parent 3377e49 commit 008a106

File tree

10 files changed

+125
-34
lines changed

10 files changed

+125
-34
lines changed

services-api/src/main/java/io/scalecube/services/ServiceCall.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,20 @@ public Flux<ServiceMessage> requestBidirectional(
284284
if (serviceRegistry != null
285285
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
286286
// local service
287-
return methodInvoker.invokeBidirectional(messages).map(this::throwIfError);
287+
return methodInvoker
288+
.invokeBidirectional(messages)
289+
.map(this::throwIfError)
290+
.contextWrite(
291+
context -> {
292+
if (context.hasKey(RequestContext.class)) {
293+
return context;
294+
} else {
295+
return new RequestContext(context)
296+
.headers(request.headers())
297+
.request(request)
298+
.principal(NULL_PRINCIPAL);
299+
}
300+
});
288301
} else {
289302
// remote service
290303
Objects.requireNonNull(transport, "[requestBidirectional] transport");

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -152,20 +152,61 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
152152
* @return flux of service messages
153153
*/
154154
public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publisher) {
155-
return Flux.from(publisher)
156-
.switchOnFirst(
157-
(first, messages) -> {
158-
final var message = first.get();
159-
final var qualifier = message.qualifier();
160-
final var dataFormat = message.dataFormat();
155+
return RequestContext.deferContextual()
156+
.flatMapMany(
157+
context ->
158+
Flux.from(publisher)
159+
.switchOnFirst(
160+
(first, messages) -> {
161+
final var message = first.get();
162+
final var request = copyRequest(message);
163+
final var qualifier = message.qualifier();
164+
final var dataFormat = message.dataFormat();
161165

162-
return messages
163-
.map(this::toRequest)
164-
.transform(this::invokeRequest)
165-
.map(response -> toResponse(response, qualifier, dataFormat))
166-
.onErrorResume(ex -> Flux.just(errorMapper.toMessage(qualifier, ex)))
167-
.subscribeOn(methodInfo.scheduler());
168-
});
166+
return mapPrincipal(context)
167+
.flatMapMany(
168+
principal ->
169+
requestContext()
170+
.thenMany(
171+
Flux.defer(
172+
() ->
173+
messages
174+
.map(this::toRequest)
175+
.transform(this::invokeRequest)))
176+
.contextWrite(
177+
enhanceRequestContext(context, request, principal)))
178+
.doOnSubscribe(
179+
s -> {
180+
if (logger != null && logger.isDebugEnabled()) {
181+
logger.debug(
182+
"[{}][subscribe] request: {}",
183+
qualifier,
184+
toString(request));
185+
}
186+
})
187+
.doOnComplete(
188+
() -> {
189+
if (logger != null && logger.isDebugEnabled()) {
190+
logger.debug(
191+
"[{}][complete] request: {}",
192+
qualifier,
193+
toString(request));
194+
}
195+
})
196+
.doOnError(
197+
ex -> {
198+
if (logger != null) {
199+
logger.error(
200+
"[{}][error] request: {}",
201+
qualifier,
202+
toString(request),
203+
ex);
204+
}
205+
})
206+
.map(response -> toResponse(response, qualifier, dataFormat))
207+
.onErrorResume(ex -> Flux.just(errorMapper.toMessage(qualifier, ex)))
208+
.subscribeOn(methodInfo.scheduler());
209+
}));
169210
}
170211

171212
private Mono<RequestContext> requestContext() {
@@ -227,7 +268,12 @@ private Context enhanceRequestContext(
227268
}
228269

229270
private Object toRequest(ServiceMessage message) {
230-
final var request = dataDecoder.apply(message, methodInfo.requestType());
271+
final var request = dataDecoder.decodeData(message, methodInfo.requestType());
272+
return methodInfo.isRequestTypeServiceMessage() ? request : request.data();
273+
}
274+
275+
private Object copyRequest(ServiceMessage message) {
276+
final var request = dataDecoder.copyData(message, methodInfo.requestType());
231277
return methodInfo.isRequestTypeServiceMessage() ? request : request.data();
232278
}
233279

services-api/src/main/java/io/scalecube/services/transport/api/ServiceMessageDataDecoder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22

33
import io.scalecube.services.api.ServiceMessage;
44
import java.util.ServiceLoader;
5-
import java.util.function.BiFunction;
65
import java.util.stream.StreamSupport;
76

8-
@FunctionalInterface
9-
public interface ServiceMessageDataDecoder
10-
extends BiFunction<ServiceMessage, Class<?>, ServiceMessage> {
7+
public interface ServiceMessageDataDecoder {
118

129
ServiceMessageDataDecoder INSTANCE =
1310
StreamSupport.stream(ServiceLoader.load(ServiceMessageDataDecoder.class).spliterator(), false)
1411
.findFirst()
1512
.orElse(null);
13+
14+
ServiceMessage decodeData(ServiceMessage message, Class<?> dataType);
15+
16+
ServiceMessage copyData(ServiceMessage message, Class<?> dataType);
1617
}

services-api/src/test/java/io/scalecube/services/methods/ServiceMethodInvokerTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,18 @@ class ServiceMethodInvokerTest {
3636

3737
private static final PrincipalImpl PRINCIPAL = new PrincipalImpl("user", List.of("permission"));
3838

39-
private final ServiceMessageDataDecoder dataDecoder = (message, type) -> message;
39+
private final ServiceMessageDataDecoder dataDecoder =
40+
new ServiceMessageDataDecoder() {
41+
@Override
42+
public ServiceMessage decodeData(ServiceMessage message, Class<?> dataType) {
43+
return message;
44+
}
45+
46+
@Override
47+
public ServiceMessage copyData(ServiceMessage message, Class<?> dataType) {
48+
return message;
49+
}
50+
};
4051
private final PrincipalMapper principalMapper = context -> Mono.just(PRINCIPAL);
4152
private final StubService stubService = new StubServiceImpl();
4253

services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ public void testCallRepeatedlyByInvalidAddress() {
100100

101101
private static ServiceCall serviceCall(Address address) {
102102
return new ServiceCall()
103-
.logger("serviceCall")
104103
.transport(HttpGatewayClientTransport.builder().address(address).build())
105104
.router(StaticAddressRouter.forService(address, "app-service").build());
106105
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public Mono<ServiceMessage> requestResponse(ServiceMessage message, Type respons
2727
return rsocket
2828
.flatMap(rsocket -> rsocket.requestResponse(toPayload(message)))
2929
.map(this::toMessage)
30-
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
30+
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType, false))
3131
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
3232
}
3333

@@ -36,7 +36,7 @@ public Flux<ServiceMessage> requestStream(ServiceMessage message, Type responseT
3636
return rsocket
3737
.flatMapMany(rsocket -> rsocket.requestStream(toPayload(message)))
3838
.map(this::toMessage)
39-
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
39+
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType, false))
4040
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
4141
}
4242

@@ -46,7 +46,7 @@ public Flux<ServiceMessage> requestChannel(
4646
return rsocket
4747
.flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload)))
4848
.map(this::toMessage)
49-
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
49+
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType, false))
5050
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
5151
}
5252

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ServiceMessageByteBufDataDecoder.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
public class ServiceMessageByteBufDataDecoder implements ServiceMessageDataDecoder {
77

88
@Override
9-
public ServiceMessage apply(ServiceMessage message, Class<?> dataType) {
10-
return ServiceMessageCodec.decodeData(message, dataType);
9+
public ServiceMessage decodeData(ServiceMessage message, Class<?> dataType) {
10+
return ServiceMessageCodec.decodeData(message, dataType, false);
11+
}
12+
13+
@Override
14+
public ServiceMessage copyData(ServiceMessage message, Class<?> dataType) {
15+
return ServiceMessageCodec.decodeData(message, dataType, true);
1116
}
1217
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ServiceMessageCodec.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,14 @@ public ServiceMessage decode(ByteBuf dataBuffer, ByteBuf headersBuffer)
157157
* Decode message.
158158
*
159159
* @param message the original message (with {@link ByteBuf} data)
160-
* @param dataType the type of the data.
160+
* @param dataType the type of the data
161+
* @param copyOnDecode whether to copy the buffer before decoding
161162
* @return a new Service message that upon {@link ServiceMessage#data()} returns the actual data
162163
* (of type data type)
163164
* @throws MessageCodecException when decode fails
164165
*/
165-
public static ServiceMessage decodeData(ServiceMessage message, Type dataType)
166-
throws MessageCodecException {
166+
public static ServiceMessage decodeData(
167+
ServiceMessage message, Type dataType, boolean copyOnDecode) throws MessageCodecException {
167168
if (dataType == null
168169
|| dataType == ByteBuf.class
169170
|| message.data() == null
@@ -181,7 +182,10 @@ public static ServiceMessage decodeData(ServiceMessage message, Type dataType)
181182
return ServiceMessage.from(message).data(bytes).build();
182183
}
183184

184-
try (ByteBufInputStream inputStream = new ByteBufInputStream(dataBuffer, true)) {
185+
try (ByteBufInputStream inputStream =
186+
copyOnDecode
187+
? new ByteBufInputStream(Unpooled.copiedBuffer(dataBuffer))
188+
: new ByteBufInputStream(dataBuffer, true)) {
185189
final var targetType = message.isError() ? ErrorData.class : dataType;
186190
final var dataCodec = DataCodec.getInstance(message.dataFormatOrDefault());
187191
final var decodedData = dataCodec.decode(inputStream, targetType);

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;
44

5+
import io.scalecube.services.api.ServiceMessage;
56
import io.scalecube.services.auth.PrincipalMapper;
67
import io.scalecube.services.auth.ServiceRolesProcessor;
78
import io.scalecube.services.discovery.api.ServiceDiscovery;
@@ -759,7 +760,18 @@ private Context conclude() {
759760
if (defaultDataDecoder == null) {
760761
defaultDataDecoder =
761762
Optional.ofNullable(ServiceMessageDataDecoder.INSTANCE)
762-
.orElse((message, dataType) -> message);
763+
.orElse(
764+
new ServiceMessageDataDecoder() {
765+
@Override
766+
public ServiceMessage decodeData(ServiceMessage message, Class<?> dataType) {
767+
return message;
768+
}
769+
770+
@Override
771+
public ServiceMessage copyData(ServiceMessage message, Class<?> dataType) {
772+
return message;
773+
}
774+
});
763775
}
764776

765777
if (name == null) {

services/src/test/java/io/scalecube/services/StreamingServiceTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import io.scalecube.services.sut.QuoteService;
1212
import io.scalecube.services.sut.SimpleQuoteService;
1313
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
14-
import io.scalecube.services.transport.rsocket.ServiceMessageCodec;
14+
import io.scalecube.services.transport.rsocket.ServiceMessageByteBufDataDecoder;
1515
import io.scalecube.transport.netty.websocket.WebsocketTransportFactory;
1616
import java.time.Duration;
1717
import java.util.List;
@@ -37,7 +37,7 @@ public static void setup() {
3737
.transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory()))
3838
.options(opts -> opts.metadata(serviceEndpoint)))
3939
.transport(RSocketServiceTransport::new)
40-
.defaultDataDecoder(ServiceMessageCodec::decodeData));
40+
.defaultDataDecoder(new ServiceMessageByteBufDataDecoder()));
4141

4242
final Address gatewayAddress = gateway.discoveryAddress();
4343

@@ -51,7 +51,7 @@ public static void setup() {
5151
.options(opts -> opts.metadata(endpoint))
5252
.membership(cfg -> cfg.seedMembers(gatewayAddress.toString())))
5353
.transport(RSocketServiceTransport::new)
54-
.defaultDataDecoder(ServiceMessageCodec::decodeData)
54+
.defaultDataDecoder(new ServiceMessageByteBufDataDecoder())
5555
.services(new SimpleQuoteService()));
5656
}
5757

0 commit comments

Comments
 (0)