Skip to content

Commit

Permalink
Merge pull request #3533 from earce/develop
Browse files Browse the repository at this point in the history
[streaming] ping related cleanup, inspection cleanup
  • Loading branch information
badgerwithagun authored May 25, 2020
2 parents 12d9112 + 8989b79 commit f78f9cf
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import java.io.IOException;
import org.knowm.xchange.currency.CurrencyPair;
import org.slf4j.Logger;
Expand All @@ -16,7 +14,12 @@ public class GeminiProductStreamingService extends JsonNettyStreamingService {
private final CurrencyPair currencyPair;

public GeminiProductStreamingService(String symbolUrl, CurrencyPair currencyPair) {
super(symbolUrl, Integer.MAX_VALUE, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_RETRY_DURATION, 15);
super(
symbolUrl,
Integer.MAX_VALUE,
DEFAULT_CONNECTION_TIMEOUT,
DEFAULT_RETRY_DURATION,
DEFAULT_IDLE_TIMEOUT);
this.currencyPair = currencyPair;
}

Expand All @@ -39,9 +42,4 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE
public String getUnsubscribeMessage(String channelName) throws IOException {
return null;
}

@Override
protected void handleIdle(ChannelHandlerContext ctx) {
ctx.writeAndFlush(new PingWebSocketFrame());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,7 +30,7 @@ public class KrakenStreamingService extends JsonNettyStreamingService {
private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class);
private static final String EVENT = "event";
private final Map<Integer, String> channels = new ConcurrentHashMap<>();
private ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
private final boolean isPrivate;

private final Map<Integer, String> subscriptionRequestMap = new ConcurrentHashMap<>();
Expand All @@ -57,6 +54,9 @@ protected void handleMessage(JsonNode message) {
KrakenEventType krakenEvent;
if (event != null && (krakenEvent = KrakenEventType.getEvent(event.textValue())) != null) {
switch (krakenEvent) {
case pingStatus:
LOG.info("PingStatus received: {}", message);
break;
case pong:
LOG.debug("Pong received");
break;
Expand Down Expand Up @@ -210,22 +210,6 @@ protected WebSocketClientHandler getWebSocketClientHandler(
return new KrakenWebSocketClientHandler(handshaker, handler);
}

@Override
protected Completable openConnection() {

KrakenSubscriptionMessage ping =
new KrakenSubscriptionMessage(null, KrakenEventType.ping, null, null);

subscribeConnectionSuccess()
.subscribe(
o ->
Observable.interval(30, TimeUnit.SECONDS)
.takeWhile(t -> isSocketOpen())
.subscribe(t -> sendObjectMessage(ping)));

return super.openConnection();
}

private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public enum KrakenEventType {
unsubscribe,
systemStatus,
subscriptionStatus,
pingStatus,
ping,
pong,
error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.lgo.dto.LgoSubscription;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import java.io.IOException;
import java.time.Duration;
import org.knowm.xchange.lgo.service.LgoSignatureService;
Expand All @@ -23,11 +21,6 @@ public class LgoStreamingService extends JsonNettyStreamingService {
this.signatureService = signatureService;
}

@Override
protected void handleIdle(ChannelHandlerContext ctx) {
ctx.writeAndFlush(new PingWebSocketFrame());
}

@Override
protected String getChannelNameFromMessage(JsonNode message) {
String channel = message.get("channel").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ protected Completable openConnection() {
if (ssl) {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
if (acceptAllCertificates) {
sslContextBuilder =
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
}
sslCtx = sslContextBuilder.build();
} else {
Expand Down Expand Up @@ -237,9 +236,7 @@ protected void initChannel(SocketChannel ch) {
webSocketChannel
.disconnect()
.addListener(
x -> {
completable.onError(handshakeFuture.cause());
});
x -> completable.onError(handshakeFuture.cause()));
}
});
} else {
Expand Down

0 comments on commit f78f9cf

Please sign in to comment.