Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Issue#376 (#182)
Browse files Browse the repository at this point in the history
* Refactored .send()  on websocket session
  • Loading branch information
artem-v authored Aug 20, 2021
1 parent 7c33679 commit 8222276
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,28 @@ private void onRequest(WebsocketGatewaySession session, ServiceMessage request,
final Flux<ServiceMessage> serviceStream = serviceCall.requestMany(request);

Disposable disposable =
Optional.ofNullable(request.header(RATE_LIMIT_FIELD))
.map(Integer::valueOf)
.map(serviceStream::limitRate)
.orElse(serviceStream)
.map(
response -> {
boolean isErrorResponse = false;
if (response.isError()) {
receivedError.set(true);
isErrorResponse = true;
}
return newResponseMessage(sid, response, isErrorResponse);
})
.flatMap(session::send)
session
.send(
Optional.ofNullable(request.header(RATE_LIMIT_FIELD))
.map(Integer::valueOf)
.map(serviceStream::limitRate)
.orElse(serviceStream)
.map(
response -> {
boolean isErrorResponse = response.isError();
if (isErrorResponse) {
receivedError.set(true);
}
return newResponseMessage(sid, response, isErrorResponse);
}))
.doOnError(th -> ReferenceCountUtil.safestRelease(request.data()))
.doOnError(
th ->
session
.send(toErrorResponse(errorMapper, request, th))
.contextWrite(context)
.subscribe())
.doOnComplete(
.doOnTerminate(
() -> {
if (!receivedError.get()) {
session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,38 @@ public Flux<ByteBuf> receive() {
* @return mono void
*/
public Mono<Void> send(ServiceMessage response) {
return Mono.deferContextual(
context -> {
final TextWebSocketFrame frame = new TextWebSocketFrame(codec.encode(response));
gatewayHandler.onResponse(this, frame.content(), response, (Context) context);
// send with publisher (defer buffer cleanup to netty)
return outbound
.sendObject(frame)
.then()
.doOnError(th -> gatewayHandler.onError(this, th, (Context) context));
});
}

/**
* Method to send normal response.
*
* @param messages messages
* @return mono void
*/
public Mono<Void> send(Flux<ServiceMessage> messages) {
return Mono.deferContextual(
context -> {
// send with publisher (defer buffer cleanup to netty)
return outbound
.sendObject(
Mono.just(response)
.map(codec::encode)
.map(TextWebSocketFrame::new)
.doOnNext(
frame ->
gatewayHandler.onResponse(
this, frame.content(), response, (Context) context)),
messages.map(
response -> {
final TextWebSocketFrame frame =
new TextWebSocketFrame(codec.encode(response));
gatewayHandler.onResponse(
this, frame.content(), response, (Context) context);
return frame;
}),
f -> true)
.then()
.doOnError(th -> gatewayHandler.onError(this, th, (Context) context));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.scalecube.services.gateway.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;

public class CancelledSubscriber implements CoreSubscriber {

private static final Logger LOGGER = LoggerFactory.getLogger(CancelledSubscriber.class);

public static final CancelledSubscriber INSTANCE = new CancelledSubscriber();

private CancelledSubscriber() {
// Do not instantiate
}

@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
// no-op
}

@Override
public void onNext(Object o) {
LOGGER.warn("Received ({}) which will be dropped immediately due cancelled aeron inbound", o);
}

@Override
public void onError(Throwable t) {
// no-op
}

@Override
public void onComplete() {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package io.scalecube.services.gateway.websocket;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;

public final class ReactiveAdapter extends BaseSubscriber<Object> implements ReactiveOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveAdapter.class);

private static final AtomicLongFieldUpdater<ReactiveAdapter> REQUESTED =
AtomicLongFieldUpdater.newUpdater(ReactiveAdapter.class, "requested");

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<ReactiveAdapter, CoreSubscriber>
DESTINATION_SUBSCRIBER =
AtomicReferenceFieldUpdater.newUpdater(
ReactiveAdapter.class, CoreSubscriber.class, "destinationSubscriber");

private final FluxReceive inbound = new FluxReceive();

private volatile long requested;
private volatile boolean fastPath;
private long produced;
private volatile CoreSubscriber<? super Object> destinationSubscriber;
private Throwable lastError;

@Override
public boolean isDisposed() {
return destinationSubscriber == CancelledSubscriber.INSTANCE;
}

@Override
public void dispose(Throwable throwable) {
Subscription upstream = upstream();
if (upstream != null) {
upstream.cancel();
}
CoreSubscriber<?> destination =
DESTINATION_SUBSCRIBER.getAndSet(this, CancelledSubscriber.INSTANCE);
if (destination != null) {
destination.onError(throwable);
}
}

@Override
public void dispose() {
inbound.cancel();
}

public Flux<Object> receive() {
return inbound;
}

@Override
public void lastError(Throwable throwable) {
lastError = throwable;
}

@Override
public Throwable lastError() {
return lastError;
}

@Override
public void tryNext(Object Object) {
if (!isDisposed()) {
destinationSubscriber.onNext(Object);
} else {
LOGGER.warn("[tryNext] reactiveAdapter is disposed, dropping : " + Object);
}
}

@Override
public boolean isFastPath() {
return fastPath;
}

@Override
public void commitProduced() {
if (produced > 0) {
Operators.produced(REQUESTED, this, produced);
produced = 0;
}
}

@Override
public long incrementProduced() {
return ++produced;
}

@Override
public long requested(long limit) {
return Math.min(requested, limit);
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(requested);
}

@Override
protected void hookOnNext(Object Object) {
tryNext(Object);
}

@Override
protected void hookOnComplete() {
dispose();
}

@Override
protected void hookOnError(Throwable throwable) {
dispose(throwable);
}

@Override
protected void hookOnCancel() {
dispose();
}

class FluxReceive extends Flux<Object> implements Subscription {

@Override
public void request(long n) {
Subscription upstream = upstream();
if (upstream != null) {
upstream.request(n);
}
if (fastPath) {
return;
}
if (n == Long.MAX_VALUE) {
fastPath = true;
requested = Long.MAX_VALUE;
return;
}
Operators.addCap(REQUESTED, ReactiveAdapter.this, n);
}

@Override
public void cancel() {
Subscription upstream = upstream();
if (upstream != null) {
upstream.cancel();
}
CoreSubscriber<?> destination =
DESTINATION_SUBSCRIBER.getAndSet(ReactiveAdapter.this, CancelledSubscriber.INSTANCE);
if (destination != null) {
destination.onComplete();
}
}

@Override
public void subscribe(CoreSubscriber<? super Object> destinationSubscriber) {
boolean result =
DESTINATION_SUBSCRIBER.compareAndSet(ReactiveAdapter.this, null, destinationSubscriber);
if (result) {
destinationSubscriber.onSubscribe(this);
} else {
Operators.error(
destinationSubscriber,
isDisposed()
? Exceptions.failWithCancel()
: Exceptions.duplicateOnSubscribeException());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.scalecube.services.gateway.websocket;

import reactor.core.Disposable;

public interface ReactiveOperator extends Disposable {

void dispose(Throwable throwable);

void lastError(Throwable throwable);

Throwable lastError();

void tryNext(Object fragment);

boolean isFastPath();

void commitProduced();

long incrementProduced();

long requested(long limit);
}
Loading

0 comments on commit 8222276

Please sign in to comment.