-
Notifications
You must be signed in to change notification settings - Fork 217
#502 kraken resubscribe issue #551
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,9 @@ | ||
package info.bitrich.xchangestream.kraken; | ||
|
||
import static info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType.subscribe; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.google.common.collect.Sets; | ||
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionConfig; | ||
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionMessage; | ||
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionStatusMessage; | ||
|
@@ -18,25 +17,34 @@ | |
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; | ||
import io.reactivex.Completable; | ||
import io.reactivex.Observable; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.knowm.xchange.kraken.KrakenAdapters; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER; | ||
import static info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType.subscribe; | ||
|
||
/** @author makarid, pchertalev */ | ||
public class KrakenStreamingService extends JsonNettyStreamingService { | ||
private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class); | ||
private static final String EVENT = "event"; | ||
private final Map<Integer, String> channels = new ConcurrentHashMap<>(); | ||
private final Map<Integer, String> channelIds = new ConcurrentHashMap<>(); | ||
private ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); | ||
private final boolean isPrivate; | ||
|
||
private final Map<Integer, String> subscriptionRequestMap = new ConcurrentHashMap<>(); | ||
private final Map<Integer, Set<String>> subscriptionRequestMap = new ConcurrentHashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may find guava synchronized multimaps deal with this better and more thread-safely (not convinced that the |
||
|
||
public KrakenStreamingService(boolean isPrivate, String uri) { | ||
super(uri, Integer.MAX_VALUE); | ||
|
@@ -71,19 +79,30 @@ protected void handleMessage(JsonNode message) { | |
KrakenSubscriptionStatusMessage statusMessage = | ||
mapper.treeToValue(message, KrakenSubscriptionStatusMessage.class); | ||
Integer reqid = statusMessage.getReqid(); | ||
if (!isPrivate && reqid != null) channelName = subscriptionRequestMap.remove(reqid); | ||
|
||
String currencyPair = | ||
KrakenAdapters.adaptCurrencyPair(statusMessage.getPair().replace("/", "")) | ||
.toString(); | ||
if (!isPrivate && reqid != null) { | ||
Set<String> channelsList = subscriptionRequestMap.get(reqid); | ||
channelName = | ||
statusMessage.getKrakenSubscriptionConfig().getName() | ||
+ KRAKEN_CHANNEL_DELIMITER | ||
+ currencyPair; | ||
channelsList.remove(channelName); | ||
if (channelsList.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the bit I'm not convinced is thread-safe. |
||
subscriptionRequestMap.remove(reqid); | ||
} | ||
} | ||
switch (statusMessage.getStatus()) { | ||
case subscribed: | ||
LOG.info("Channel {} has been subscribed", channelName); | ||
|
||
if (statusMessage.getChannelID() != null) | ||
channels.put(statusMessage.getChannelID(), channelName); | ||
|
||
if (statusMessage.getChannelID() != null) { | ||
channelIds.put(statusMessage.getChannelID(), channelName); | ||
} | ||
break; | ||
case unsubscribed: | ||
LOG.info("Channel {} has been unsubscribed", channelName); | ||
channels.remove(statusMessage.getChannelID()); | ||
channelIds.remove(statusMessage.getChannelID()); | ||
break; | ||
case error: | ||
LOG.error( | ||
|
@@ -118,15 +137,15 @@ protected void handleMessage(JsonNode message) { | |
protected String getChannelNameFromMessage(JsonNode message) throws IOException { | ||
String channelName = null; | ||
if (message.has("channelID")) { | ||
channelName = channels.get(message.get("channelID").asInt()); | ||
channelName = channelIds.get(message.get("channelID").asInt()); | ||
} | ||
if (message.has("channelName")) { | ||
channelName = message.get("channelName").asText(); | ||
} | ||
|
||
if (message.isArray()) { | ||
if (message.get(0).isInt()) { | ||
channelName = channels.get(message.get(0).asInt()); | ||
channelName = channelIds.get(message.get(0).asInt()); | ||
} | ||
if (message.get(1).isTextual()) { | ||
channelName = message.get(1).asText(); | ||
|
@@ -142,8 +161,7 @@ protected String getChannelNameFromMessage(JsonNode message) throws IOException | |
@Override | ||
public String getSubscribeMessage(String channelName, Object... args) throws IOException { | ||
int reqID = Math.abs(UUID.randomUUID().hashCode()); | ||
String[] channelData = | ||
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER); | ||
String[] channelData = channelName.split(KRAKEN_CHANNEL_DELIMITER); | ||
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); | ||
|
||
if (isPrivate) { | ||
|
@@ -161,7 +179,7 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE | |
if (args.length > 0 && args[0] != null) { | ||
depth = (Integer) args[0]; | ||
} | ||
subscriptionRequestMap.put(reqID, channelName); | ||
subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName)); | ||
|
||
KrakenSubscriptionMessage subscriptionMessage = | ||
new KrakenSubscriptionMessage( | ||
|
@@ -176,8 +194,7 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE | |
@Override | ||
public String getUnsubscribeMessage(String channelName) throws IOException { | ||
int reqID = Math.abs(UUID.randomUUID().hashCode()); | ||
String[] channelData = | ||
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER); | ||
String[] channelData = channelName.split(KRAKEN_CHANNEL_DELIMITER); | ||
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); | ||
|
||
if (isPrivate) { | ||
|
@@ -191,7 +208,7 @@ public String getUnsubscribeMessage(String channelName) throws IOException { | |
} else { | ||
String pair = channelData[1]; | ||
|
||
subscriptionRequestMap.put(reqID, channelName); | ||
subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName)); | ||
KrakenSubscriptionMessage subscriptionMessage = | ||
new KrakenSubscriptionMessage( | ||
reqID, | ||
|
@@ -252,4 +269,50 @@ public void channelInactive(ChannelHandlerContext ctx) { | |
} | ||
} | ||
} | ||
|
||
@Override | ||
public void resubscribeChannels() { | ||
if (isPrivate) { | ||
super.resubscribeChannels(); | ||
} else { | ||
subscriptionRequestMap.clear(); | ||
channelIds.clear(); | ||
HashMap<String, KrakenSubscriptionMessage> messages = new HashMap<>(); | ||
for (Map.Entry<String, Subscription> entry : super.channels.entrySet()) { | ||
|
||
String[] channelData = entry.getKey().split(KRAKEN_CHANNEL_DELIMITER); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this would be easier to read if maybe you created a
|
||
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); | ||
|
||
String pair = channelData[1]; | ||
Object[] args = entry.getValue().getArgs(); | ||
Integer depth = null; | ||
if (args.length > 0 && args[0] != null) { | ||
depth = (Integer) args[0]; | ||
} | ||
|
||
Integer finalDepth = depth; | ||
KrakenSubscriptionMessage toSend = | ||
messages.computeIfAbsent( | ||
subscriptionName + (depth == null ? "" : (KRAKEN_CHANNEL_DELIMITER + depth)), | ||
d -> | ||
new KrakenSubscriptionMessage( | ||
Math.abs(UUID.randomUUID().hashCode()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will probably collide quite often. Maybe safer to use a static |
||
subscribe, | ||
new ArrayList<>(), | ||
new KrakenSubscriptionConfig(subscriptionName, finalDepth, null))); | ||
toSend.getPairs().add(pair); | ||
Set<String> channelsSet = | ||
subscriptionRequestMap.computeIfAbsent(toSend.getReqid(), rid -> new HashSet<>()); | ||
channelsSet.add(entry.getKey()); | ||
} | ||
|
||
for (KrakenSubscriptionMessage message : messages.values()) { | ||
try { | ||
sendMessage(objectMapper.writeValueAsString(message)); | ||
} catch (IOException e) { | ||
LOG.error("Failed to reconnect channel: {}", message.getPairs()); | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you run a maven build so the coveo-fmt plugin runs?
We've introduced that to avoid formatting issues like these creeping into PRs.
Thanks!