From 83231f7371e9aa39fb60b2c29aad66f308a4c7e3 Mon Sep 17 00:00:00 2001 From: Pavel Chertalev Date: Thu, 12 Mar 2020 17:59:19 +0300 Subject: [PATCH] #502 batch kraken resubscribing - formatting --- .../KrakenStreamingMarketDataService.java | 7 +- .../kraken/KrakenStreamingService.java | 507 +++++++++--------- .../service/netty/NettyStreamingService.java | 2 + 3 files changed, 262 insertions(+), 254 deletions(-) diff --git a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java index 9fb5311ed..fd49066da 100644 --- a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java +++ b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java @@ -145,8 +145,11 @@ public Observable subscribe(String channelName, int maxItems, Integer dept } private String getChannelName( - KrakenSubscriptionName subscriptionName, CurrencyPair currencyPair) { - String pair = KrakenAdapters.adaptCurrencyPair(currencyPair.base.getCurrencyCode() + currencyPair.counter.getCurrencyCode()).toString(); + KrakenSubscriptionName subscriptionName, CurrencyPair currencyPair) { + String pair = + KrakenAdapters.adaptCurrencyPair( + currencyPair.base.getCurrencyCode() + currencyPair.counter.getCurrencyCode()) + .toString(); return subscriptionName + KRAKEN_CHANNEL_DELIMITER + pair; } diff --git a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java index e25f5b780..58bec015b 100644 --- a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java +++ b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java @@ -36,280 +36,283 @@ import static info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER; import static info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType.subscribe; -/** - * @author makarid, pchertalev - */ +/** @author makarid, pchertalev */ public class KrakenStreamingService extends JsonNettyStreamingService { - private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class); - private static final String EVENT = "event"; - private final Map channelIds = new ConcurrentHashMap<>(); - private ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); - private final boolean isPrivate; - - private final Map> subscriptionRequestMap = new ConcurrentHashMap<>(); - - public KrakenStreamingService(boolean isPrivate, String uri) { - super(uri, Integer.MAX_VALUE); - this.isPrivate = isPrivate; - } - - @Override - public boolean processArrayMassageSeparately() { - return false; - } - - @Override - protected void handleMessage(JsonNode message) { - String channelName = getChannel(message); - - try { - JsonNode event = message.get(EVENT); - KrakenEventType krakenEvent; - if (event != null && (krakenEvent = KrakenEventType.getEvent(event.textValue())) != null) { - switch (krakenEvent) { - case pong: - LOG.debug("Pong received"); - break; - case heartbeat: - LOG.debug("Heartbeat received"); - break; - case systemStatus: - KrakenSystemStatus systemStatus = mapper.treeToValue(message, KrakenSystemStatus.class); - LOG.info("System status: {}", systemStatus); - break; - case subscriptionStatus: - KrakenSubscriptionStatusMessage statusMessage = - mapper.treeToValue(message, KrakenSubscriptionStatusMessage.class); - Integer reqid = statusMessage.getReqid(); - String currencyPair = KrakenAdapters.adaptCurrencyPair(statusMessage.getPair().replace("/", "")).toString(); - if (!isPrivate && reqid != null) { - Set channelsList = subscriptionRequestMap.get(reqid); - channelName = statusMessage.getKrakenSubscriptionConfig().getName() + KRAKEN_CHANNEL_DELIMITER + currencyPair; - channelsList.remove(channelName); - if (channelsList.isEmpty()) { - subscriptionRequestMap.remove(reqid); - } - } - switch (statusMessage.getStatus()) { - case subscribed: - LOG.info("Channel {} has been subscribed", channelName); - if (statusMessage.getChannelID() != null) { - channelIds.put(statusMessage.getChannelID(), channelName); - } - break; - case unsubscribed: - LOG.info("Channel {} has been unsubscribed", channelName); - channelIds.remove(statusMessage.getChannelID()); - break; - case error: - LOG.error( - "Channel {} has been failed: {}", channelName, statusMessage.getErrorMessage()); - } - break; - case error: - LOG.error( - "Error received: {}", - message.has("errorMessage") - ? message.get("errorMessage").asText() - : message.toString()); - break; - default: - LOG.warn("Unexpected event type has been received: {}", krakenEvent); + private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class); + private static final String EVENT = "event"; + private final Map channelIds = new ConcurrentHashMap<>(); + private ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); + private final boolean isPrivate; + + private final Map> subscriptionRequestMap = new ConcurrentHashMap<>(); + + public KrakenStreamingService(boolean isPrivate, String uri) { + super(uri, Integer.MAX_VALUE); + this.isPrivate = isPrivate; + } + + @Override + public boolean processArrayMassageSeparately() { + return false; + } + + @Override + protected void handleMessage(JsonNode message) { + String channelName = getChannel(message); + + try { + JsonNode event = message.get(EVENT); + KrakenEventType krakenEvent; + if (event != null && (krakenEvent = KrakenEventType.getEvent(event.textValue())) != null) { + switch (krakenEvent) { + case pong: + LOG.debug("Pong received"); + break; + case heartbeat: + LOG.debug("Heartbeat received"); + break; + case systemStatus: + KrakenSystemStatus systemStatus = mapper.treeToValue(message, KrakenSystemStatus.class); + LOG.info("System status: {}", systemStatus); + break; + case subscriptionStatus: + KrakenSubscriptionStatusMessage statusMessage = + mapper.treeToValue(message, KrakenSubscriptionStatusMessage.class); + Integer reqid = statusMessage.getReqid(); + String currencyPair = + KrakenAdapters.adaptCurrencyPair(statusMessage.getPair().replace("/", "")) + .toString(); + if (!isPrivate && reqid != null) { + Set channelsList = subscriptionRequestMap.get(reqid); + channelName = + statusMessage.getKrakenSubscriptionConfig().getName() + + KRAKEN_CHANNEL_DELIMITER + + currencyPair; + channelsList.remove(channelName); + if (channelsList.isEmpty()) { + subscriptionRequestMap.remove(reqid); + } + } + switch (statusMessage.getStatus()) { + case subscribed: + LOG.info("Channel {} has been subscribed", channelName); + if (statusMessage.getChannelID() != null) { + channelIds.put(statusMessage.getChannelID(), channelName); } - return; + break; + case unsubscribed: + LOG.info("Channel {} has been unsubscribed", channelName); + channelIds.remove(statusMessage.getChannelID()); + break; + case error: + LOG.error( + "Channel {} has been failed: {}", channelName, statusMessage.getErrorMessage()); } - } catch (JsonProcessingException e) { - LOG.error("Error reading message: {}", e.getMessage(), e); - } - - if (!message.isArray() || channelName == null) { - LOG.error("Unknown message: {}", message.toString()); - return; + break; + case error: + LOG.error( + "Error received: {}", + message.has("errorMessage") + ? message.get("errorMessage").asText() + : message.toString()); + break; + default: + LOG.warn("Unexpected event type has been received: {}", krakenEvent); } - - super.handleMessage(message); + return; + } + } catch (JsonProcessingException e) { + LOG.error("Error reading message: {}", e.getMessage(), e); } - @Override - protected String getChannelNameFromMessage(JsonNode message) throws IOException { - String channelName = null; - if (message.has("channelID")) { - channelName = channelIds.get(message.get("channelID").asInt()); - } - if (message.has("channelName")) { - channelName = message.get("channelName").asText(); - } - - if (message.isArray()) { - if (message.get(0).isInt()) { - channelName = channelIds.get(message.get(0).asInt()); - } - if (message.get(1).isTextual()) { - channelName = message.get(1).asText(); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("ChannelName {}", StringUtils.isBlank(channelName) ? "not defined" : channelName); - } - return channelName; + if (!message.isArray() || channelName == null) { + LOG.error("Unknown message: {}", message.toString()); + return; } - @Override - public String getSubscribeMessage(String channelName, Object... args) throws IOException { - int reqID = Math.abs(UUID.randomUUID().hashCode()); - String[] channelData = - channelName.split(KRAKEN_CHANNEL_DELIMITER); - KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); - - if (isPrivate) { - String token = (String) args[0]; - - KrakenSubscriptionMessage subscriptionMessage = - new KrakenSubscriptionMessage( - reqID, subscribe, null, new KrakenSubscriptionConfig(subscriptionName, null, token)); - - return objectMapper.writeValueAsString(subscriptionMessage); - } else { - String pair = channelData[1]; - - Integer depth = null; - if (args.length > 0 && args[0] != null) { - depth = (Integer) args[0]; - } - subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName)); + super.handleMessage(message); + } - KrakenSubscriptionMessage subscriptionMessage = - new KrakenSubscriptionMessage( - reqID, - subscribe, - Collections.singletonList(pair), - new KrakenSubscriptionConfig(subscriptionName, depth, null)); - return objectMapper.writeValueAsString(subscriptionMessage); - } + @Override + protected String getChannelNameFromMessage(JsonNode message) throws IOException { + String channelName = null; + if (message.has("channelID")) { + channelName = channelIds.get(message.get("channelID").asInt()); + } + if (message.has("channelName")) { + channelName = message.get("channelName").asText(); } - @Override - public String getUnsubscribeMessage(String channelName) throws IOException { - int reqID = Math.abs(UUID.randomUUID().hashCode()); - String[] channelData = channelName.split(KRAKEN_CHANNEL_DELIMITER); - KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); + if (message.isArray()) { + if (message.get(0).isInt()) { + channelName = channelIds.get(message.get(0).asInt()); + } + if (message.get(1).isTextual()) { + channelName = message.get(1).asText(); + } + } - if (isPrivate) { - KrakenSubscriptionMessage subscriptionMessage = - new KrakenSubscriptionMessage( - reqID, - KrakenEventType.unsubscribe, - null, - new KrakenSubscriptionConfig(subscriptionName, null, null)); - return objectMapper.writeValueAsString(subscriptionMessage); - } else { - String pair = channelData[1]; - - subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName)); - KrakenSubscriptionMessage subscriptionMessage = - new KrakenSubscriptionMessage( - reqID, - KrakenEventType.unsubscribe, - Collections.singletonList(pair), - new KrakenSubscriptionConfig(subscriptionName)); - return objectMapper.writeValueAsString(subscriptionMessage); - } + if (LOG.isDebugEnabled()) { + LOG.debug("ChannelName {}", StringUtils.isBlank(channelName) ? "not defined" : channelName); + } + return channelName; + } + + @Override + public String getSubscribeMessage(String channelName, Object... args) throws IOException { + int reqID = Math.abs(UUID.randomUUID().hashCode()); + String[] channelData = channelName.split(KRAKEN_CHANNEL_DELIMITER); + KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); + + if (isPrivate) { + String token = (String) args[0]; + + KrakenSubscriptionMessage subscriptionMessage = + new KrakenSubscriptionMessage( + reqID, subscribe, null, new KrakenSubscriptionConfig(subscriptionName, null, token)); + + return objectMapper.writeValueAsString(subscriptionMessage); + } else { + String pair = channelData[1]; + + Integer depth = null; + if (args.length > 0 && args[0] != null) { + depth = (Integer) args[0]; + } + subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName)); + + KrakenSubscriptionMessage subscriptionMessage = + new KrakenSubscriptionMessage( + reqID, + subscribe, + Collections.singletonList(pair), + new KrakenSubscriptionConfig(subscriptionName, depth, null)); + return objectMapper.writeValueAsString(subscriptionMessage); + } + } + + @Override + public String getUnsubscribeMessage(String channelName) throws IOException { + int reqID = Math.abs(UUID.randomUUID().hashCode()); + String[] channelData = channelName.split(KRAKEN_CHANNEL_DELIMITER); + KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); + + if (isPrivate) { + KrakenSubscriptionMessage subscriptionMessage = + new KrakenSubscriptionMessage( + reqID, + KrakenEventType.unsubscribe, + null, + new KrakenSubscriptionConfig(subscriptionName, null, null)); + return objectMapper.writeValueAsString(subscriptionMessage); + } else { + String pair = channelData[1]; + + subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName)); + KrakenSubscriptionMessage subscriptionMessage = + new KrakenSubscriptionMessage( + reqID, + KrakenEventType.unsubscribe, + Collections.singletonList(pair), + new KrakenSubscriptionConfig(subscriptionName)); + return objectMapper.writeValueAsString(subscriptionMessage); + } + } + + @Override + protected WebSocketClientHandler getWebSocketClientHandler( + WebSocketClientHandshaker handshaker, + WebSocketClientHandler.WebSocketMessageHandler handler) { + LOG.info("Registering KrakenWebSocketClientHandler"); + return new KrakenWebSocketClientHandler(handshaker, handler); + } + + @Override + protected Completable openConnection() { + + KrakenSubscriptionMessage ping = + new KrakenSubscriptionMessage(null, KrakenEventType.ping, null, null); + + subscribeConnectionSuccess() + .subscribe( + o -> + Observable.interval(30, TimeUnit.SECONDS) + .takeWhile(t -> isSocketOpen()) + .subscribe(t -> sendObjectMessage(ping))); + + return super.openConnection(); + } + + private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; + + /** + * Custom client handler in order to execute an external, user-provided handler on channel events. + * This is useful because it seems Kraken unexpectedly closes the web socket connection. + */ + class KrakenWebSocketClientHandler extends NettyWebSocketClientHandler { + + public KrakenWebSocketClientHandler( + WebSocketClientHandshaker handshaker, WebSocketMessageHandler handler) { + super(handshaker, handler); } @Override - protected WebSocketClientHandler getWebSocketClientHandler( - WebSocketClientHandshaker handshaker, - WebSocketClientHandler.WebSocketMessageHandler handler) { - LOG.info("Registering KrakenWebSocketClientHandler"); - return new KrakenWebSocketClientHandler(handshaker, handler); + public void channelActive(ChannelHandlerContext ctx) { + super.channelActive(ctx); } @Override - protected Completable openConnection() { - - KrakenSubscriptionMessage ping = - new KrakenSubscriptionMessage(null, KrakenEventType.ping, null, null); - - subscribeConnectionSuccess() - .subscribe( - o -> - Observable.interval(30, TimeUnit.SECONDS) - .takeWhile(t -> isSocketOpen()) - .subscribe(t -> sendObjectMessage(ping))); - - return super.openConnection(); + public void channelInactive(ChannelHandlerContext ctx) { + super.channelInactive(ctx); + if (channelInactiveHandler != null) { + channelInactiveHandler.onMessage("WebSocket Client disconnected!"); + } } + } + + @Override + public void resubscribeChannels() { + if (isPrivate) { + super.resubscribeChannels(); + } else { + subscriptionRequestMap.clear(); + channelIds.clear(); + HashMap messages = new HashMap<>(); + for (Map.Entry entry : super.channels.entrySet()) { + + String[] channelData = entry.getKey().split(KRAKEN_CHANNEL_DELIMITER); + KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); - private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; - - /** - * Custom client handler in order to execute an external, user-provided handler on channel events. - * This is useful because it seems Kraken unexpectedly closes the web socket connection. - */ - class KrakenWebSocketClientHandler extends NettyWebSocketClientHandler { - - public KrakenWebSocketClientHandler( - WebSocketClientHandshaker handshaker, WebSocketMessageHandler handler) { - super(handshaker, handler); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) { - super.channelActive(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) { - super.channelInactive(ctx); - if (channelInactiveHandler != null) { - channelInactiveHandler.onMessage("WebSocket Client disconnected!"); - } + String pair = channelData[1]; + Object[] args = entry.getValue().getArgs(); + Integer depth = null; + if (args.length > 0 && args[0] != null) { + depth = (Integer) args[0]; } - } - - @Override - public void resubscribeChannels() { - if (isPrivate) { - super.resubscribeChannels(); - } else { - subscriptionRequestMap.clear(); - channelIds.clear(); - HashMap messages = new HashMap<>(); - for (Map.Entry entry : super.channels.entrySet()) { - - String[] channelData = entry.getKey().split(KRAKEN_CHANNEL_DELIMITER); - KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); - - String pair = channelData[1]; - Object[] args = entry.getValue().getArgs(); - Integer depth = null; - if (args.length > 0 && args[0] != null) { - depth = (Integer) args[0]; - } - - Integer finalDepth = depth; - KrakenSubscriptionMessage toSend = messages.computeIfAbsent( - subscriptionName + (depth == null ? "" : (KRAKEN_CHANNEL_DELIMITER + depth)), - d -> new KrakenSubscriptionMessage( - Math.abs(UUID.randomUUID().hashCode()), - subscribe, - new ArrayList<>(), - new KrakenSubscriptionConfig(subscriptionName, finalDepth, null) - ) - ); - toSend.getPairs().add(pair); - Set channelsSet = subscriptionRequestMap.computeIfAbsent(toSend.getReqid(), rid -> new HashSet<>()); - channelsSet.add(entry.getKey()); - } - for (KrakenSubscriptionMessage message : messages.values()) { - try { - sendMessage(objectMapper.writeValueAsString(message)); - } catch (IOException e) { - LOG.error("Failed to reconnect channel: {}", message.getPairs()); - } - } + Integer finalDepth = depth; + KrakenSubscriptionMessage toSend = + messages.computeIfAbsent( + subscriptionName + (depth == null ? "" : (KRAKEN_CHANNEL_DELIMITER + depth)), + d -> + new KrakenSubscriptionMessage( + Math.abs(UUID.randomUUID().hashCode()), + subscribe, + new ArrayList<>(), + new KrakenSubscriptionConfig(subscriptionName, finalDepth, null))); + toSend.getPairs().add(pair); + Set channelsSet = + subscriptionRequestMap.computeIfAbsent(toSend.getReqid(), rid -> new HashSet<>()); + channelsSet.add(entry.getKey()); + } + + for (KrakenSubscriptionMessage message : messages.values()) { + try { + sendMessage(objectMapper.writeValueAsString(message)); + } catch (IOException e) { + LOG.error("Failed to reconnect channel: {}", message.getPairs()); } + } } + } } diff --git a/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java b/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java index 7c5e5dfd5..6907cc9e2 100644 --- a/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java +++ b/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java @@ -70,9 +70,11 @@ public Subscription(ObservableEmitter emitter, String channelName, Object[] a this.channelName = channelName; this.args = args; } + public String getChannelName() { return channelName; } + public Object[] getArgs() { return args; }