diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java index bc8720f8f..db11df507 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java @@ -5,6 +5,7 @@ import info.bitrich.xchangestream.core.StreamingMarketDataService; import io.reactivex.Completable; import org.knowm.xchange.binance.BinanceExchange; +import org.knowm.xchange.binance.service.BinanceMarketDataService; import org.knowm.xchange.currency.CurrencyPair; import java.util.List; @@ -38,7 +39,7 @@ public Completable connect(ProductSubscription... args) { ProductSubscription subscriptions = args[0]; streamingService = createStreamingService(subscriptions); - streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService); + streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService, (BinanceMarketDataService) marketDataService); return streamingService.connect() .doOnComplete(() -> streamingMarketDataService.openSubscriptions(subscriptions)); } diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 8b7fe4176..80a08c106 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -16,6 +16,7 @@ import org.knowm.xchange.binance.BinanceAdapters; import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook; import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h; +import org.knowm.xchange.binance.service.BinanceMarketDataService; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.Order.OrderType; import org.knowm.xchange.dto.marketdata.OrderBook; @@ -27,10 +28,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.DEPTH_UPDATE; import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.TICKER_24_HR; @@ -40,16 +41,17 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingMarketDataService.class); private final BinanceStreamingService service; - private final Map orderbooks = new HashMap<>(); + private final Map orderbooks = new HashMap<>(); private final Map> tickerSubscriptions = new HashMap<>(); private final Map> orderbookSubscriptions = new HashMap<>(); private final Map> tradeSubscriptions = new HashMap<>(); private final ObjectMapper mapper = new ObjectMapper(); + private final BinanceMarketDataService marketDataService; - - public BinanceStreamingMarketDataService(BinanceStreamingService service) { + public BinanceStreamingMarketDataService(BinanceStreamingService service, BinanceMarketDataService marketDataService) { this.service = service; + this.marketDataService = marketDataService; mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @@ -126,34 +128,131 @@ private Observable rawTickerStream(CurrencyPair currencyPair) .map(transaction -> transaction.getData().getTicker()); } + private final class OrderbookSubscription { + long snapshotlastUpdateId; + AtomicLong lastUpdateId = new AtomicLong(0L); + OrderBook orderBook; + Observable> stream; + AtomicLong lastSyncTime = new AtomicLong(0L); + + void invalidateSnapshot() { + snapshotlastUpdateId = 0L; + } + + void initSnapshotIfInvalid(CurrencyPair currencyPair) { + + if (snapshotlastUpdateId != 0L) + return; + + // Don't attempt reconnects too often to avoid bans. 3 seconds will do it. + long now = System.currentTimeMillis(); + if (now - lastSyncTime.get() < 3000) { + return; + } + + try { + LOG.info("Fetching initial orderbook snapshot for {} ", currencyPair); + BinanceOrderbook book = marketDataService.getBinanceOrderbook(currencyPair, 1000); + snapshotlastUpdateId = book.lastUpdateId; + lastUpdateId.set(book.lastUpdateId); + orderBook = BinanceMarketDataService.convertOrderBook(book, currencyPair); + } catch (Throwable e) { + LOG.error("Failed to fetch initial order book for " + currencyPair, e); + snapshotlastUpdateId = 0L; + lastUpdateId.set(0L); + orderBook = null; + } + lastSyncTime.set(now); + } + } + + private OrderbookSubscription connectOrderBook(CurrencyPair currencyPair) { + OrderbookSubscription subscription = new OrderbookSubscription(); + + // 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth + // 2. Buffer the events you receive from the stream. + subscription.stream = service.subscribeChannel(channelFromCurrency(currencyPair, "depth")) + .map((JsonNode s) -> depthTransaction(s.toString())) + .filter(transaction -> + transaction.getData().getCurrencyPair().equals(currencyPair) && + transaction.getData().getEventType() == DEPTH_UPDATE); + + + return subscription; + } + private Observable orderBookStream(CurrencyPair currencyPair) { - return service.subscribeChannel(channelFromCurrency(currencyPair, "depth")) - .map((JsonNode s) -> depthTransaction(s.toString())) - .filter(transaction -> - transaction.getData().getCurrencyPair().equals(currencyPair) && - transaction.getData().getEventType() == DEPTH_UPDATE) - .map(transaction -> { - DepthBinanceWebSocketTransaction depth = transaction.getData(); + OrderbookSubscription subscription = orderbooks.computeIfAbsent(currencyPair, pair -> connectOrderBook(pair)); + + return subscription.stream + + // 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000 + // (we do this if we don't already have one or we've invalidated a previous one) + .doOnNext(transaction -> subscription.initSnapshotIfInvalid(currencyPair)) + + // If we failed, don't return anything. Just keep trying until it works + .filter(transaction -> subscription.snapshotlastUpdateId > 0L) + + .map(BinanceWebsocketTransaction::getData) + + // 4. Drop any event where u is <= lastUpdateId in the snapshot + .filter(depth -> depth.getLastUpdateId() > subscription.snapshotlastUpdateId) + + // 5. The first processed should have U <= lastUpdateId+1 AND u >= lastUpdateId+1 + .filter(depth -> { + long lastUpdateId = subscription.lastUpdateId.get(); + if (lastUpdateId == 0L) { + return depth.getFirstUpdateId() <= lastUpdateId + 1 && + depth.getLastUpdateId() >= lastUpdateId + 1; + } else { + return true; + } + }) - OrderBook currentOrderBook = orderbooks.computeIfAbsent(currencyPair, orderBook -> - new OrderBook(null, new ArrayList<>(), new ArrayList<>())); + // 6. While listening to the stream, each new event's U should be equal to the previous event's u+1 + .filter(depth -> { + long lastUpdateId = subscription.lastUpdateId.get(); + boolean result; + if (lastUpdateId == 0L) { + result = true; + } else { + result = depth.getFirstUpdateId() == lastUpdateId + 1; + } + if (result) { + subscription.lastUpdateId.set(depth.getLastUpdateId()); + } else { + // If not, we re-sync. This will commonly occur a few times when starting up, since + // given update ids 1,2,3,4,5,6,7,8,9, Binance may sometimes return a snapshot + // as of 5, but update events covering 1-3, 4-6 and 7-9. We can't apply the 4-6 + // update event without double-counting 5, and we can't apply the 7-9 update without + // missing 6. The only thing we can do is to keep requesting a fresh snapshot until + // we get to a situation where the snapshot and an update event precisely line up. + LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId()); + subscription.invalidateSnapshot(); + } + return result; + }) + // 7. The data in each event is the absolute quantity for a price level + // 8. If the quantity is 0, remove the price level + // 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal. + .map(depth -> { BinanceOrderbook ob = depth.getOrderBook(); - ob.bids.forEach((key, value) -> currentOrderBook.update(new OrderBookUpdate( + ob.bids.forEach((key, value) -> subscription.orderBook.update(new OrderBookUpdate( OrderType.BID, null, currencyPair, key, depth.getEventTime(), value))); - ob.asks.forEach((key, value) -> currentOrderBook.update(new OrderBookUpdate( + ob.asks.forEach((key, value) -> subscription.orderBook.update(new OrderBookUpdate( OrderType.ASK, null, currencyPair, key, depth.getEventTime(), value))); - return currentOrderBook; + return subscription.orderBook; }); } diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java index 58049dec4..cd4b049be 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/DepthBinanceWebSocketTransaction.java @@ -8,20 +8,33 @@ public class DepthBinanceWebSocketTransaction extends ProductBinanceWebSocketTransaction { private final BinanceOrderbook orderBook; + private final long lastUpdateId; + private final long firstUpdateId; public DepthBinanceWebSocketTransaction( @JsonProperty("e") String eventType, @JsonProperty("E") String eventTime, @JsonProperty("s") String symbol, + @JsonProperty("U") long firstUpdateId, @JsonProperty("u") long lastUpdateId, @JsonProperty("b") List _bids, @JsonProperty("a") List _asks ) { super(eventType, eventTime, symbol); + this.firstUpdateId = firstUpdateId; + this.lastUpdateId = lastUpdateId; orderBook = new BinanceOrderbook(lastUpdateId, _bids, _asks); } public BinanceOrderbook getOrderBook() { return orderBook; } + + public long getFirstUpdateId() { + return firstUpdateId; + } + + public long getLastUpdateId() { + return lastUpdateId; + } } diff --git a/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java index a3e190e0b..2e91cb3bc 100644 --- a/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java +++ b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceManualExample.java @@ -4,11 +4,10 @@ import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.core.StreamingExchangeFactory; import org.knowm.xchange.currency.CurrencyPair; -import org.knowm.xchange.dto.trade.LimitOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; +import io.reactivex.disposables.Disposable; /** * Created by Lukas Zaoralek on 15.11.17. @@ -16,7 +15,7 @@ public class BinanceManualExample { private static final Logger LOG = LoggerFactory.getLogger(BinanceManualExample.class); - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(BinanceStreamingExchange.class.getName()); ProductSubscription subscription = ProductSubscription.create() @@ -28,22 +27,46 @@ public static void main(String[] args) { exchange.connect(subscription).blockingAwait(); - exchange.getStreamingMarketDataService() + Disposable tickers = exchange.getStreamingMarketDataService() .getTicker(CurrencyPair.ETH_BTC) .subscribe(ticker -> { LOG.info("Ticker: {}", ticker); }, throwable -> LOG.error("ERROR in getting ticker: ", throwable)); - exchange.getStreamingMarketDataService() - .getOrderBook(CurrencyPair.LTC_BTC) - .subscribe(orderBook -> { - LOG.info("Order Book: {}", orderBook); - }, throwable -> LOG.error("ERROR in getting order book: ", throwable)); - - exchange.getStreamingMarketDataService() + Disposable trades = exchange.getStreamingMarketDataService() .getTrades(CurrencyPair.BTC_USDT) .subscribe(trade -> { LOG.info("Trade: {}", trade); }); + + Disposable orderbooks = orderbooks(exchange, "one"); + Thread.sleep(5000); + + Disposable orderbooks2 = orderbooks(exchange, "two"); + Thread.sleep(10000); + + tickers.dispose(); + trades.dispose(); + orderbooks.dispose(); + orderbooks2.dispose(); + exchange.disconnect().blockingAwait(); + + } + + private static Disposable orderbooks(StreamingExchange exchange, String identifier) { + return exchange.getStreamingMarketDataService() + .getOrderBook(CurrencyPair.LTC_BTC) + .subscribe(orderBook -> { + LOG.info( + "Order Book ({}): askDepth={} ask={} askSize={} bidDepth={}. bid={}, bidSize={}", + identifier, + orderBook.getAsks().size(), + orderBook.getAsks().get(0).getLimitPrice(), + orderBook.getAsks().get(0).getRemainingAmount(), + orderBook.getBids().size(), + orderBook.getBids().get(0).getLimitPrice(), + orderBook.getBids().get(0).getRemainingAmount() + ); + }, throwable -> LOG.error("ERROR in getting order book: ", throwable)); } } diff --git a/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceOrderbookHighVolumeExample.java b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceOrderbookHighVolumeExample.java new file mode 100644 index 000000000..ba7d241eb --- /dev/null +++ b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceOrderbookHighVolumeExample.java @@ -0,0 +1,29 @@ +package info.bitrich.xchangestream.binance; + +import org.knowm.xchange.ExchangeSpecification; +import info.bitrich.xchangestream.binance.BinanceStreamingExchange; +import info.bitrich.xchangestream.core.ProductSubscription; +import info.bitrich.xchangestream.core.StreamingExchange; +import info.bitrich.xchangestream.core.StreamingExchangeFactory; + +/** + * This is a useful test for profiling behaviour of the orderbook stream under load. + * Run this with a profiler to ensure that processing is efficient and free of memory leaks + */ +public class BinanceOrderbookHighVolumeExample { + + public static void main(String[] args) throws InterruptedException { + final ExchangeSpecification exchangeSpecification = new ExchangeSpecification(BinanceStreamingExchange.class); + exchangeSpecification.setShouldLoadRemoteMetaData(true); + StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(exchangeSpecification); + ProductSubscription subscription = exchange.getExchangeSymbols().stream().limit(50) + .reduce(ProductSubscription.create(), ProductSubscription.ProductSubscriptionBuilder::addOrderbook, + (productSubscriptionBuilder, productSubscriptionBuilder2) -> { + throw new UnsupportedOperationException(); + }) + .build(); + exchange.connect(subscription).blockingAwait(); + Thread.sleep(Long.MAX_VALUE); + } + +} \ No newline at end of file