From afa47ffbc6e177ba6f81dafa38d0e096a920abda Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 6 Oct 2018 14:46:40 +0100 Subject: [PATCH 1/8] My stab at the binance order book problem --- .../binance/BinanceStreamingExchange.java | 3 +- .../BinanceStreamingMarketDataService.java | 112 +++++++++++++++--- .../dto/DepthBinanceWebSocketTransaction.java | 13 ++ 3 files changed, 111 insertions(+), 17 deletions(-) diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java index bc8720f8f..db11df507 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java @@ -5,6 +5,7 @@ import info.bitrich.xchangestream.core.StreamingMarketDataService; import io.reactivex.Completable; import org.knowm.xchange.binance.BinanceExchange; +import org.knowm.xchange.binance.service.BinanceMarketDataService; import org.knowm.xchange.currency.CurrencyPair; import java.util.List; @@ -38,7 +39,7 @@ public Completable connect(ProductSubscription... args) { ProductSubscription subscriptions = args[0]; streamingService = createStreamingService(subscriptions); - streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService); + streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService, (BinanceMarketDataService) marketDataService); return streamingService.connect() .doOnComplete(() -> streamingMarketDataService.openSubscriptions(subscriptions)); } diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 8b7fe4176..0949e4ee5 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -1,7 +1,6 @@ package info.bitrich.xchangestream.binance; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import info.bitrich.xchangestream.binance.dto.BinanceRawTrade; @@ -12,10 +11,14 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingMarketDataService; import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; +import io.reactivex.observables.ConnectableObservable; + import org.knowm.xchange.binance.BinanceAdapters; import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook; import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h; +import org.knowm.xchange.binance.service.BinanceMarketDataService; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.Order.OrderType; import org.knowm.xchange.dto.marketdata.OrderBook; @@ -31,6 +34,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.DEPTH_UPDATE; import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.TICKER_24_HR; @@ -40,16 +44,17 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingMarketDataService.class); private final BinanceStreamingService service; - private final Map orderbooks = new HashMap<>(); + private final Map orderbooks = new HashMap<>(); private final Map> tickerSubscriptions = new HashMap<>(); private final Map> orderbookSubscriptions = new HashMap<>(); private final Map> tradeSubscriptions = new HashMap<>(); private final ObjectMapper mapper = new ObjectMapper(); + private final BinanceMarketDataService marketDataService; - - public BinanceStreamingMarketDataService(BinanceStreamingService service) { + public BinanceStreamingMarketDataService(BinanceStreamingService service, BinanceMarketDataService marketDataService) { this.service = service; + this.marketDataService = marketDataService; mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @@ -126,34 +131,109 @@ private Observable rawTickerStream(CurrencyPair currencyPair) .map(transaction -> transaction.getData().getTicker()); } + private static final class OrderbookSubscription { + long snapshotlastUpdateId = 0L; + AtomicLong lastUpdateId = new AtomicLong(0L); + OrderBook orderBook; + ConnectableObservable> stream; + Disposable disposable; + } + + private OrderbookSubscription initialOrderBook(CurrencyPair currencyPair) { + OrderbookSubscription subscription = new OrderbookSubscription(); + + // 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth + subscription.stream = service.subscribeChannel(channelFromCurrency(currencyPair, "depth")) + .map((JsonNode s) -> depthTransaction(s.toString())) + .filter(transaction -> + transaction.getData().getCurrencyPair().equals(currencyPair) && + transaction.getData().getEventType() == DEPTH_UPDATE) + + // 2.Buffer the events you receive from the stream + .replay(); + subscription.disposable = subscription.stream.connect(); + + // 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000 + setSnapshot(currencyPair, subscription); + return subscription; + } + + private void setSnapshot(CurrencyPair currencyPair, OrderbookSubscription subscription) { + try { + LOG.info("Fetching initial orderbook snapshot for {} ", currencyPair); + BinanceOrderbook book = marketDataService.getBinanceOrderbook(currencyPair, 1000); + subscription.snapshotlastUpdateId = book.lastUpdateId; + subscription.lastUpdateId.set(book.lastUpdateId); + subscription.orderBook = BinanceMarketDataService.convertOrderBook(book, currencyPair); + } catch (IOException e) { + LOG.error("Failed to fetch initial order book for " + currencyPair); + subscription.orderBook = new OrderBook(null, new ArrayList<>(), new ArrayList<>()); + } + } + private Observable orderBookStream(CurrencyPair currencyPair) { - return service.subscribeChannel(channelFromCurrency(currencyPair, "depth")) - .map((JsonNode s) -> depthTransaction(s.toString())) - .filter(transaction -> - transaction.getData().getCurrencyPair().equals(currencyPair) && - transaction.getData().getEventType() == DEPTH_UPDATE) - .map(transaction -> { - DepthBinanceWebSocketTransaction depth = transaction.getData(); + OrderbookSubscription subscription = orderbooks.computeIfAbsent(currencyPair, pair -> initialOrderBook(pair)); + + return subscription.stream + .doOnComplete(() -> { + subscription.disposable.dispose(); + orderbooks.remove(currencyPair); + }) + .map(BinanceWebsocketTransaction::getData) + + // 4. Drop any event where u is <= lastUpdateId in the snapshot + .filter(depth -> depth.getLastUpdateId() > subscription.snapshotlastUpdateId) + + // 5. The first processed should have U <= lastUpdateId+1 AND u >= lastUpdateId+1 + .filter(depth -> { + long lastUpdateId = subscription.lastUpdateId.get(); + if (lastUpdateId == 0L) { + return depth.getFirstUpdateId() <= lastUpdateId + 1 && + depth.getLastUpdateId() >= lastUpdateId + 1; + } else { + return true; + } + }) - OrderBook currentOrderBook = orderbooks.computeIfAbsent(currencyPair, orderBook -> - new OrderBook(null, new ArrayList<>(), new ArrayList<>())); + // 6. While listening to the stream, each new event's U should be equal to the previous event's u+1 + .filter(depth -> { + long lastUpdateId = subscription.lastUpdateId.get(); + boolean result; + if (lastUpdateId == 0L) { + result = true; + } else { + result = depth.getFirstUpdateId() == lastUpdateId + 1; + } + if (result) { + subscription.lastUpdateId.set(depth.getLastUpdateId()); + } else { + // If not, we re-sync + LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}) {} ", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId()); + setSnapshot(currencyPair, subscription); + } + return result; + }) + // 7. The data in each event is the absolute quantity for a price level + // 8. If the quantity is 0, remove the price level + // 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal. + .map(depth -> { BinanceOrderbook ob = depth.getOrderBook(); - ob.bids.forEach((key, value) -> currentOrderBook.update(new OrderBookUpdate( + ob.bids.forEach((key, value) -> subscription.orderBook.update(new OrderBookUpdate( OrderType.BID, null, currencyPair, key, depth.getEventTime(), value))); - ob.asks.forEach((key, value) -> currentOrderBook.update(new OrderBookUpdate( + ob.asks.forEach((key, value) -> subscription.orderBook.update(new OrderBookUpdate( OrderType.ASK, null, currencyPair, key, depth.getEventTime(), value))); - return currentOrderBook; + return subscription.orderBook; }); } diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java index 58049dec4..cd4b049be 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java @@ -8,20 +8,33 @@ public class DepthBinanceWebSocketTransaction extends ProductBinanceWebSocketTransaction { private final BinanceOrderbook orderBook; + private final long lastUpdateId; + private final long firstUpdateId; public DepthBinanceWebSocketTransaction( @JsonProperty("e") String eventType, @JsonProperty("E") String eventTime, @JsonProperty("s") String symbol, + @JsonProperty("U") long firstUpdateId, @JsonProperty("u") long lastUpdateId, @JsonProperty("b") List _bids, @JsonProperty("a") List _asks ) { super(eventType, eventTime, symbol); + this.firstUpdateId = firstUpdateId; + this.lastUpdateId = lastUpdateId; orderBook = new BinanceOrderbook(lastUpdateId, _bids, _asks); } public BinanceOrderbook getOrderBook() { return orderBook; } + + public long getFirstUpdateId() { + return firstUpdateId; + } + + public long getLastUpdateId() { + return lastUpdateId; + } } From ddc9b5e175f83fa48cbb8b5c2a85e4e0e2bc0df6 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 6 Oct 2018 15:00:57 +0100 Subject: [PATCH 2/8] Fix dodgy error message --- .../binance/BinanceStreamingMarketDataService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 0949e4ee5..081ab6ba5 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -208,7 +208,7 @@ private Observable orderBookStream(CurrencyPair currencyPair) { subscription.lastUpdateId.set(depth.getLastUpdateId()); } else { // If not, we re-sync - LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}) {} ", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId()); + LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={})", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId()); setSnapshot(currencyPair, subscription); } return result; From 12312442a7042d359945a64248166c5f3a965192 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 6 Oct 2018 15:06:10 +0100 Subject: [PATCH 3/8] Safer error handling --- .../binance/BinanceStreamingMarketDataService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 081ab6ba5..fdad2cd1a 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -165,8 +165,8 @@ private void setSnapshot(CurrencyPair currencyPair, OrderbookSubscription subscr subscription.snapshotlastUpdateId = book.lastUpdateId; subscription.lastUpdateId.set(book.lastUpdateId); subscription.orderBook = BinanceMarketDataService.convertOrderBook(book, currencyPair); - } catch (IOException e) { - LOG.error("Failed to fetch initial order book for " + currencyPair); + } catch (Throwable e) { + LOG.error("Failed to fetch initial order book for " + currencyPair, e); subscription.orderBook = new OrderBook(null, new ArrayList<>(), new ArrayList<>()); } } From d38cb5aca6354e88afb300bcf8d974065ad1e0a8 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 6 Oct 2018 15:29:34 +0100 Subject: [PATCH 4/8] Avoid repeated attempts to resync the orderbook causing bans --- .../BinanceStreamingMarketDataService.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index fdad2cd1a..e261db635 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -132,11 +132,12 @@ private Observable rawTickerStream(CurrencyPair currencyPair) } private static final class OrderbookSubscription { - long snapshotlastUpdateId = 0L; + long snapshotlastUpdateId; AtomicLong lastUpdateId = new AtomicLong(0L); OrderBook orderBook; ConnectableObservable> stream; Disposable disposable; + AtomicLong lastSyncTime = new AtomicLong(0L); } private OrderbookSubscription initialOrderBook(CurrencyPair currencyPair) { @@ -159,6 +160,19 @@ private OrderbookSubscription initialOrderBook(CurrencyPair currencyPair) { } private void setSnapshot(CurrencyPair currencyPair, OrderbookSubscription subscription) { + + // Don't attempt reconnects too often to avoid bans. 3 seconds will do it. + long now = System.currentTimeMillis(); + long lastSync = subscription.lastSyncTime.get(); + if (now - lastSync < 3000) { + try { + Thread.sleep(3000 - (now - lastSync)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + try { LOG.info("Fetching initial orderbook snapshot for {} ", currencyPair); BinanceOrderbook book = marketDataService.getBinanceOrderbook(currencyPair, 1000); @@ -169,6 +183,7 @@ private void setSnapshot(CurrencyPair currencyPair, OrderbookSubscription subscr LOG.error("Failed to fetch initial order book for " + currencyPair, e); subscription.orderBook = new OrderBook(null, new ArrayList<>(), new ArrayList<>()); } + subscription.lastSyncTime.set(now); } private Observable orderBookStream(CurrencyPair currencyPair) { From 0efd0aa497aff37008ab804b1c58f99de53b29d0 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Mon, 8 Oct 2018 15:20:39 +0100 Subject: [PATCH 5/8] Fix missing import on version create for develop branch merge, bump Xchange dependency to 4.3.9, needed for acceess to convert order book --- pom.xml | 2 +- .../binance/BinanceStreamingMarketDataService.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2d3587750..3250b768e 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,7 @@ - 4.3.5 + 4.3.9 UTF-8 UTF-8 1.8 diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index e261db635..7bfef708c 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -1,6 +1,7 @@ package info.bitrich.xchangestream.binance; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import info.bitrich.xchangestream.binance.dto.BinanceRawTrade; From 9d1f546b6c93293d7eead97ea4f54b71f54b5b52 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Mon, 8 Oct 2018 16:40:51 +0100 Subject: [PATCH 6/8] Did some detailed testing of re-sync/setup/shutdown/backpressure conditions and simplified the cleanup logic. Also added some commentary here since the way this is working, and why it's working that way, isn't massively obvious. There is definitely a thread leak here (BinanceManualExample never completes) which could explain some memory leaks I'm seeing, but will tackle that separately. --- .../BinanceStreamingMarketDataService.java | 21 ++++----- .../binance/BinanceManualExample.java | 45 ++++++++++++++----- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 7bfef708c..4bc98bb34 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -12,7 +12,6 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingMarketDataService; import io.reactivex.Observable; -import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; import io.reactivex.observables.ConnectableObservable; @@ -137,7 +136,6 @@ private static final class OrderbookSubscription { AtomicLong lastUpdateId = new AtomicLong(0L); OrderBook orderBook; ConnectableObservable> stream; - Disposable disposable; AtomicLong lastSyncTime = new AtomicLong(0L); } @@ -151,9 +149,11 @@ private OrderbookSubscription initialOrderBook(CurrencyPair currencyPair) { transaction.getData().getCurrencyPair().equals(currencyPair) && transaction.getData().getEventType() == DEPTH_UPDATE) - // 2.Buffer the events you receive from the stream + // 2.Buffer the events you receive from the stream. + // This is solely to allow room for us to periodically fetch a fresh snapshot + // in the event that binance sends events out of sequence or skips events. .replay(); - subscription.disposable = subscription.stream.connect(); + subscription.stream.connect(); // 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000 setSnapshot(currencyPair, subscription); @@ -191,10 +191,6 @@ private Observable orderBookStream(CurrencyPair currencyPair) { OrderbookSubscription subscription = orderbooks.computeIfAbsent(currencyPair, pair -> initialOrderBook(pair)); return subscription.stream - .doOnComplete(() -> { - subscription.disposable.dispose(); - orderbooks.remove(currencyPair); - }) .map(BinanceWebsocketTransaction::getData) // 4. Drop any event where u is <= lastUpdateId in the snapshot @@ -223,8 +219,13 @@ private Observable orderBookStream(CurrencyPair currencyPair) { if (result) { subscription.lastUpdateId.set(depth.getLastUpdateId()); } else { - // If not, we re-sync - LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={})", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId()); + // If not, we re-sync. This will commonly occur a few times when starting up, since + // given update ids 1,2,3,4,5,6,7,8,9, Binance may sometimes return a snapshot + // as of 5, but update events covering 1-3, 4-6 and 7-9. We can't apply the 4-6 + // update event without double-counting 5, and we can't apply the 7-9 update without + // missing 6. The only thing we can do is to keep requesting a fresh snapshot until + // we get to a situation where the snapshot and an update event precisely line up. + LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId()); setSnapshot(currencyPair, subscription); } return result; diff --git a/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java index a3e190e0b..2e91cb3bc 100644 --- a/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java +++ b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java @@ -4,11 +4,10 @@ import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.core.StreamingExchangeFactory; import org.knowm.xchange.currency.CurrencyPair; -import org.knowm.xchange.dto.trade.LimitOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; +import io.reactivex.disposables.Disposable; /** * Created by Lukas Zaoralek on 15.11.17. @@ -16,7 +15,7 @@ public class BinanceManualExample { private static final Logger LOG = LoggerFactory.getLogger(BinanceManualExample.class); - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(BinanceStreamingExchange.class.getName()); ProductSubscription subscription = ProductSubscription.create() @@ -28,22 +27,46 @@ public static void main(String[] args) { exchange.connect(subscription).blockingAwait(); - exchange.getStreamingMarketDataService() + Disposable tickers = exchange.getStreamingMarketDataService() .getTicker(CurrencyPair.ETH_BTC) .subscribe(ticker -> { LOG.info("Ticker: {}", ticker); }, throwable -> LOG.error("ERROR in getting ticker: ", throwable)); - exchange.getStreamingMarketDataService() - .getOrderBook(CurrencyPair.LTC_BTC) - .subscribe(orderBook -> { - LOG.info("Order Book: {}", orderBook); - }, throwable -> LOG.error("ERROR in getting order book: ", throwable)); - - exchange.getStreamingMarketDataService() + Disposable trades = exchange.getStreamingMarketDataService() .getTrades(CurrencyPair.BTC_USDT) .subscribe(trade -> { LOG.info("Trade: {}", trade); }); + + Disposable orderbooks = orderbooks(exchange, "one"); + Thread.sleep(5000); + + Disposable orderbooks2 = orderbooks(exchange, "two"); + Thread.sleep(10000); + + tickers.dispose(); + trades.dispose(); + orderbooks.dispose(); + orderbooks2.dispose(); + exchange.disconnect().blockingAwait(); + + } + + private static Disposable orderbooks(StreamingExchange exchange, String identifier) { + return exchange.getStreamingMarketDataService() + .getOrderBook(CurrencyPair.LTC_BTC) + .subscribe(orderBook -> { + LOG.info( + "Order Book ({}): askDepth={} ask={} askSize={} bidDepth={}. bid={}, bidSize={}", + identifier, + orderBook.getAsks().size(), + orderBook.getAsks().get(0).getLimitPrice(), + orderBook.getAsks().get(0).getRemainingAmount(), + orderBook.getBids().size(), + orderBook.getBids().get(0).getLimitPrice(), + orderBook.getBids().get(0).getRemainingAmount() + ); + }, throwable -> LOG.error("ERROR in getting order book: ", throwable)); } } From 890eba9387033c59963c34b4910afb2bd4e7ee94 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Mon, 8 Oct 2018 18:24:46 +0100 Subject: [PATCH 7/8] Improve behaviour when the initial snapshot fails. Instead of serving up partial snapshots, we'll just keep trying to create the initial snapshot, ignoring any updates, until the snapshot is created successfully. Means we can guarantee all the updates contain correct, full order books. --- .../BinanceStreamingMarketDataService.java | 72 ++++++++++--------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 4bc98bb34..b2c5acd75 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -131,15 +131,45 @@ private Observable rawTickerStream(CurrencyPair currencyPair) .map(transaction -> transaction.getData().getTicker()); } - private static final class OrderbookSubscription { + private final class OrderbookSubscription { long snapshotlastUpdateId; AtomicLong lastUpdateId = new AtomicLong(0L); OrderBook orderBook; ConnectableObservable> stream; AtomicLong lastSyncTime = new AtomicLong(0L); + + void invalidateSnapshot() { + snapshotlastUpdateId = 0L; + } + + void initSnapshotIfInvalid(CurrencyPair currencyPair) { + + if (snapshotlastUpdateId != 0L) + return; + + // Don't attempt reconnects too often to avoid bans. 3 seconds will do it. + long now = System.currentTimeMillis(); + if (now - lastSyncTime.get() < 3000) { + return; + } + + try { + LOG.info("Fetching initial orderbook snapshot for {} ", currencyPair); + BinanceOrderbook book = marketDataService.getBinanceOrderbook(currencyPair, 1000); + snapshotlastUpdateId = book.lastUpdateId; + lastUpdateId.set(book.lastUpdateId); + orderBook = BinanceMarketDataService.convertOrderBook(book, currencyPair); + } catch (Throwable e) { + LOG.error("Failed to fetch initial order book for " + currencyPair, e); + snapshotlastUpdateId = 0L; + lastUpdateId.set(0L); + orderBook = new OrderBook(null, new ArrayList<>(), new ArrayList<>()); + } + lastSyncTime.set(now); + } } - private OrderbookSubscription initialOrderBook(CurrencyPair currencyPair) { + private OrderbookSubscription connectOrderBook(CurrencyPair currencyPair) { OrderbookSubscription subscription = new OrderbookSubscription(); // 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth @@ -155,42 +185,18 @@ private OrderbookSubscription initialOrderBook(CurrencyPair currencyPair) { .replay(); subscription.stream.connect(); - // 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000 - setSnapshot(currencyPair, subscription); return subscription; } - private void setSnapshot(CurrencyPair currencyPair, OrderbookSubscription subscription) { - - // Don't attempt reconnects too often to avoid bans. 3 seconds will do it. - long now = System.currentTimeMillis(); - long lastSync = subscription.lastSyncTime.get(); - if (now - lastSync < 3000) { - try { - Thread.sleep(3000 - (now - lastSync)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - try { - LOG.info("Fetching initial orderbook snapshot for {} ", currencyPair); - BinanceOrderbook book = marketDataService.getBinanceOrderbook(currencyPair, 1000); - subscription.snapshotlastUpdateId = book.lastUpdateId; - subscription.lastUpdateId.set(book.lastUpdateId); - subscription.orderBook = BinanceMarketDataService.convertOrderBook(book, currencyPair); - } catch (Throwable e) { - LOG.error("Failed to fetch initial order book for " + currencyPair, e); - subscription.orderBook = new OrderBook(null, new ArrayList<>(), new ArrayList<>()); - } - subscription.lastSyncTime.set(now); - } - private Observable orderBookStream(CurrencyPair currencyPair) { - OrderbookSubscription subscription = orderbooks.computeIfAbsent(currencyPair, pair -> initialOrderBook(pair)); + OrderbookSubscription subscription = orderbooks.computeIfAbsent(currencyPair, pair -> connectOrderBook(pair)); return subscription.stream + + // 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000 + // (we do this if we don't already have one or we've invalidated a previous one) + .doOnNext(transaction -> subscription.initSnapshotIfInvalid(currencyPair)) + .map(BinanceWebsocketTransaction::getData) // 4. Drop any event where u is <= lastUpdateId in the snapshot @@ -226,7 +232,7 @@ private Observable orderBookStream(CurrencyPair currencyPair) { // missing 6. The only thing we can do is to keep requesting a fresh snapshot until // we get to a situation where the snapshot and an update event precisely line up. LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId()); - setSnapshot(currencyPair, subscription); + subscription.invalidateSnapshot(); } return result; }) From cc532a5e34b17c1ad721c13429d32629b1743a65 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Mon, 15 Oct 2018 02:49:35 +0100 Subject: [PATCH 8/8] Fix memory leak --- .../BinanceStreamingMarketDataService.java | 18 +++++------- .../BinanceOrderbookHighVolumeExample.java | 29 +++++++++++++++++++ 2 files changed, 36 insertions(+), 11 deletions(-) create mode 100644 xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceOrderbookHighVolumeExample.java diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index b2c5acd75..80a08c106 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -13,8 +13,6 @@ import info.bitrich.xchangestream.core.StreamingMarketDataService; import io.reactivex.Observable; import io.reactivex.functions.Consumer; -import io.reactivex.observables.ConnectableObservable; - import org.knowm.xchange.binance.BinanceAdapters; import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook; import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h; @@ -30,7 +28,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -135,7 +132,7 @@ private final class OrderbookSubscription { long snapshotlastUpdateId; AtomicLong lastUpdateId = new AtomicLong(0L); OrderBook orderBook; - ConnectableObservable> stream; + Observable> stream; AtomicLong lastSyncTime = new AtomicLong(0L); void invalidateSnapshot() { @@ -163,7 +160,7 @@ void initSnapshotIfInvalid(CurrencyPair currencyPair) { LOG.error("Failed to fetch initial order book for " + currencyPair, e); snapshotlastUpdateId = 0L; lastUpdateId.set(0L); - orderBook = new OrderBook(null, new ArrayList<>(), new ArrayList<>()); + orderBook = null; } lastSyncTime.set(now); } @@ -173,17 +170,13 @@ private OrderbookSubscription connectOrderBook(CurrencyPair currencyPair) { OrderbookSubscription subscription = new OrderbookSubscription(); // 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth + // 2. Buffer the events you receive from the stream. subscription.stream = service.subscribeChannel(channelFromCurrency(currencyPair, "depth")) .map((JsonNode s) -> depthTransaction(s.toString())) .filter(transaction -> transaction.getData().getCurrencyPair().equals(currencyPair) && - transaction.getData().getEventType() == DEPTH_UPDATE) + transaction.getData().getEventType() == DEPTH_UPDATE); - // 2.Buffer the events you receive from the stream. - // This is solely to allow room for us to periodically fetch a fresh snapshot - // in the event that binance sends events out of sequence or skips events. - .replay(); - subscription.stream.connect(); return subscription; } @@ -197,6 +190,9 @@ private Observable orderBookStream(CurrencyPair currencyPair) { // (we do this if we don't already have one or we've invalidated a previous one) .doOnNext(transaction -> subscription.initSnapshotIfInvalid(currencyPair)) + // If we failed, don't return anything. Just keep trying until it works + .filter(transaction -> subscription.snapshotlastUpdateId > 0L) + .map(BinanceWebsocketTransaction::getData) // 4. Drop any event where u is <= lastUpdateId in the snapshot diff --git a/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceOrderbookHighVolumeExample.java b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceOrderbookHighVolumeExample.java new file mode 100644 index 000000000..ba7d241eb --- /dev/null +++ b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceOrderbookHighVolumeExample.java @@ -0,0 +1,29 @@ +package info.bitrich.xchangestream.binance; + +import org.knowm.xchange.ExchangeSpecification; +import info.bitrich.xchangestream.binance.BinanceStreamingExchange; +import info.bitrich.xchangestream.core.ProductSubscription; +import info.bitrich.xchangestream.core.StreamingExchange; +import info.bitrich.xchangestream.core.StreamingExchangeFactory; + +/** + * This is a useful test for profiling behaviour of the orderbook stream under load. + * Run this with a profiler to ensure that processing is efficient and free of memory leaks + */ +public class BinanceOrderbookHighVolumeExample { + + public static void main(String[] args) throws InterruptedException { + final ExchangeSpecification exchangeSpecification = new ExchangeSpecification(BinanceStreamingExchange.class); + exchangeSpecification.setShouldLoadRemoteMetaData(true); + StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(exchangeSpecification); + ProductSubscription subscription = exchange.getExchangeSymbols().stream().limit(50) + .reduce(ProductSubscription.create(), ProductSubscription.ProductSubscriptionBuilder::addOrderbook, + (productSubscriptionBuilder, productSubscriptionBuilder2) -> { + throw new UnsupportedOperationException(); + }) + .build(); + exchange.connect(subscription).blockingAwait(); + Thread.sleep(Long.MAX_VALUE); + } + +} \ No newline at end of file