Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

[Binance] Orderbook initial snapshot #227

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Completable;
import org.knowm.xchange.binance.BinanceExchange;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;

import java.util.List;
Expand Down Expand Up @@ -38,7 +39,7 @@ public Completable connect(ProductSubscription... args) {

ProductSubscription subscriptions = args[0];
streamingService = createStreamingService(subscriptions);
streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService);
streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService, (BinanceMarketDataService) marketDataService);
return streamingService.connect()
.doOnComplete(() -> streamingMarketDataService.openSubscriptions(subscriptions));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.knowm.xchange.binance.BinanceAdapters;
import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook;
import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order.OrderType;
import org.knowm.xchange.dto.marketdata.OrderBook;
Expand All @@ -27,10 +28,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.DEPTH_UPDATE;
import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.TICKER_24_HR;
Expand All @@ -40,16 +41,17 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingMarketDataService.class);

private final BinanceStreamingService service;
private final Map<CurrencyPair, OrderBook> orderbooks = new HashMap<>();
private final Map<CurrencyPair, OrderbookSubscription> orderbooks = new HashMap<>();

private final Map<CurrencyPair, Observable<BinanceTicker24h>> tickerSubscriptions = new HashMap<>();
private final Map<CurrencyPair, Observable<OrderBook>> orderbookSubscriptions = new HashMap<>();
private final Map<CurrencyPair, Observable<BinanceRawTrade>> tradeSubscriptions = new HashMap<>();
private final ObjectMapper mapper = new ObjectMapper();
private final BinanceMarketDataService marketDataService;


public BinanceStreamingMarketDataService(BinanceStreamingService service) {
public BinanceStreamingMarketDataService(BinanceStreamingService service, BinanceMarketDataService marketDataService) {
this.service = service;
this.marketDataService = marketDataService;
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

Expand Down Expand Up @@ -126,34 +128,131 @@ private Observable<BinanceTicker24h> rawTickerStream(CurrencyPair currencyPair)
.map(transaction -> transaction.getData().getTicker());
}

private final class OrderbookSubscription {
long snapshotlastUpdateId;
AtomicLong lastUpdateId = new AtomicLong(0L);
OrderBook orderBook;
Observable<BinanceWebsocketTransaction<DepthBinanceWebSocketTransaction>> stream;
AtomicLong lastSyncTime = new AtomicLong(0L);

void invalidateSnapshot() {
snapshotlastUpdateId = 0L;
}

void initSnapshotIfInvalid(CurrencyPair currencyPair) {

if (snapshotlastUpdateId != 0L)
return;

// Don't attempt reconnects too often to avoid bans. 3 seconds will do it.
long now = System.currentTimeMillis();
if (now - lastSyncTime.get() < 3000) {
return;
}

try {
LOG.info("Fetching initial orderbook snapshot for {} ", currencyPair);
BinanceOrderbook book = marketDataService.getBinanceOrderbook(currencyPair, 1000);
snapshotlastUpdateId = book.lastUpdateId;
lastUpdateId.set(book.lastUpdateId);
orderBook = BinanceMarketDataService.convertOrderBook(book, currencyPair);
} catch (Throwable e) {
LOG.error("Failed to fetch initial order book for " + currencyPair, e);
snapshotlastUpdateId = 0L;
lastUpdateId.set(0L);
orderBook = null;
}
lastSyncTime.set(now);
}
}

private OrderbookSubscription connectOrderBook(CurrencyPair currencyPair) {
OrderbookSubscription subscription = new OrderbookSubscription();

// 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth
// 2. Buffer the events you receive from the stream.
subscription.stream = service.subscribeChannel(channelFromCurrency(currencyPair, "depth"))
.map((JsonNode s) -> depthTransaction(s.toString()))
.filter(transaction ->
transaction.getData().getCurrencyPair().equals(currencyPair) &&
transaction.getData().getEventType() == DEPTH_UPDATE);


return subscription;
}

private Observable<OrderBook> orderBookStream(CurrencyPair currencyPair) {
return service.subscribeChannel(channelFromCurrency(currencyPair, "depth"))
.map((JsonNode s) -> depthTransaction(s.toString()))
.filter(transaction ->
transaction.getData().getCurrencyPair().equals(currencyPair) &&
transaction.getData().getEventType() == DEPTH_UPDATE)
.map(transaction -> {
DepthBinanceWebSocketTransaction depth = transaction.getData();
OrderbookSubscription subscription = orderbooks.computeIfAbsent(currencyPair, pair -> connectOrderBook(pair));

return subscription.stream

// 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000
// (we do this if we don't already have one or we've invalidated a previous one)
.doOnNext(transaction -> subscription.initSnapshotIfInvalid(currencyPair))

// If we failed, don't return anything. Just keep trying until it works
.filter(transaction -> subscription.snapshotlastUpdateId > 0L)

.map(BinanceWebsocketTransaction::getData)

// 4. Drop any event where u is <= lastUpdateId in the snapshot
.filter(depth -> depth.getLastUpdateId() > subscription.snapshotlastUpdateId)

// 5. The first processed should have U <= lastUpdateId+1 AND u >= lastUpdateId+1
.filter(depth -> {
long lastUpdateId = subscription.lastUpdateId.get();
if (lastUpdateId == 0L) {
return depth.getFirstUpdateId() <= lastUpdateId + 1 &&
depth.getLastUpdateId() >= lastUpdateId + 1;
} else {
return true;
}
})

OrderBook currentOrderBook = orderbooks.computeIfAbsent(currencyPair, orderBook ->
new OrderBook(null, new ArrayList<>(), new ArrayList<>()));
// 6. While listening to the stream, each new event's U should be equal to the previous event's u+1
.filter(depth -> {
long lastUpdateId = subscription.lastUpdateId.get();
boolean result;
if (lastUpdateId == 0L) {
result = true;
} else {
result = depth.getFirstUpdateId() == lastUpdateId + 1;
}
if (result) {
subscription.lastUpdateId.set(depth.getLastUpdateId());
} else {
// If not, we re-sync. This will commonly occur a few times when starting up, since
// given update ids 1,2,3,4,5,6,7,8,9, Binance may sometimes return a snapshot
// as of 5, but update events covering 1-3, 4-6 and 7-9. We can't apply the 4-6
// update event without double-counting 5, and we can't apply the 7-9 update without
// missing 6. The only thing we can do is to keep requesting a fresh snapshot until
// we get to a situation where the snapshot and an update event precisely line up.
LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", currencyPair, lastUpdateId, depth.getFirstUpdateId(), depth.getLastUpdateId());
subscription.invalidateSnapshot();
}
return result;
})

// 7. The data in each event is the absolute quantity for a price level
// 8. If the quantity is 0, remove the price level
// 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal.
.map(depth -> {
BinanceOrderbook ob = depth.getOrderBook();
ob.bids.forEach((key, value) -> currentOrderBook.update(new OrderBookUpdate(
ob.bids.forEach((key, value) -> subscription.orderBook.update(new OrderBookUpdate(
OrderType.BID,
null,
currencyPair,
key,
depth.getEventTime(),
value)));
ob.asks.forEach((key, value) -> currentOrderBook.update(new OrderBookUpdate(
ob.asks.forEach((key, value) -> subscription.orderBook.update(new OrderBookUpdate(
OrderType.ASK,
null,
currencyPair,
key,
depth.getEventTime(),
value)));
return currentOrderBook;
return subscription.orderBook;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,33 @@
public class DepthBinanceWebSocketTransaction extends ProductBinanceWebSocketTransaction {

private final BinanceOrderbook orderBook;
private final long lastUpdateId;
private final long firstUpdateId;

public DepthBinanceWebSocketTransaction(
@JsonProperty("e") String eventType,
@JsonProperty("E") String eventTime,
@JsonProperty("s") String symbol,
@JsonProperty("U") long firstUpdateId,
@JsonProperty("u") long lastUpdateId,
@JsonProperty("b") List<Object[]> _bids,
@JsonProperty("a") List<Object[]> _asks
) {
super(eventType, eventTime, symbol);
this.firstUpdateId = firstUpdateId;
this.lastUpdateId = lastUpdateId;
orderBook = new BinanceOrderbook(lastUpdateId, _bids, _asks);
}

public BinanceOrderbook getOrderBook() {
return orderBook;
}

public long getFirstUpdateId() {
return firstUpdateId;
}

public long getLastUpdateId() {
return lastUpdateId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingExchangeFactory;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import io.reactivex.disposables.Disposable;

/**
* Created by Lukas Zaoralek on 15.11.17.
*/
public class BinanceManualExample {
private static final Logger LOG = LoggerFactory.getLogger(BinanceManualExample.class);

public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(BinanceStreamingExchange.class.getName());

ProductSubscription subscription = ProductSubscription.create()
Expand All @@ -28,22 +27,46 @@ public static void main(String[] args) {

exchange.connect(subscription).blockingAwait();

exchange.getStreamingMarketDataService()
Disposable tickers = exchange.getStreamingMarketDataService()
.getTicker(CurrencyPair.ETH_BTC)
.subscribe(ticker -> {
LOG.info("Ticker: {}", ticker);
}, throwable -> LOG.error("ERROR in getting ticker: ", throwable));

exchange.getStreamingMarketDataService()
.getOrderBook(CurrencyPair.LTC_BTC)
.subscribe(orderBook -> {
LOG.info("Order Book: {}", orderBook);
}, throwable -> LOG.error("ERROR in getting order book: ", throwable));

exchange.getStreamingMarketDataService()
Disposable trades = exchange.getStreamingMarketDataService()
.getTrades(CurrencyPair.BTC_USDT)
.subscribe(trade -> {
LOG.info("Trade: {}", trade);
});

Disposable orderbooks = orderbooks(exchange, "one");
Thread.sleep(5000);

Disposable orderbooks2 = orderbooks(exchange, "two");
Thread.sleep(10000);

tickers.dispose();
trades.dispose();
orderbooks.dispose();
orderbooks2.dispose();
exchange.disconnect().blockingAwait();

}

private static Disposable orderbooks(StreamingExchange exchange, String identifier) {
return exchange.getStreamingMarketDataService()
.getOrderBook(CurrencyPair.LTC_BTC)
.subscribe(orderBook -> {
LOG.info(
"Order Book ({}): askDepth={} ask={} askSize={} bidDepth={}. bid={}, bidSize={}",
identifier,
orderBook.getAsks().size(),
orderBook.getAsks().get(0).getLimitPrice(),
orderBook.getAsks().get(0).getRemainingAmount(),
orderBook.getBids().size(),
orderBook.getBids().get(0).getLimitPrice(),
orderBook.getBids().get(0).getRemainingAmount()
);
}, throwable -> LOG.error("ERROR in getting order book: ", throwable));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package info.bitrich.xchangestream.binance;

import org.knowm.xchange.ExchangeSpecification;
import info.bitrich.xchangestream.binance.BinanceStreamingExchange;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingExchangeFactory;

/**
* This is a useful test for profiling behaviour of the orderbook stream under load.
* Run this with a profiler to ensure that processing is efficient and free of memory leaks
*/
public class BinanceOrderbookHighVolumeExample {

public static void main(String[] args) throws InterruptedException {
final ExchangeSpecification exchangeSpecification = new ExchangeSpecification(BinanceStreamingExchange.class);
exchangeSpecification.setShouldLoadRemoteMetaData(true);
StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(exchangeSpecification);
ProductSubscription subscription = exchange.getExchangeSymbols().stream().limit(50)
.reduce(ProductSubscription.create(), ProductSubscription.ProductSubscriptionBuilder::addOrderbook,
(productSubscriptionBuilder, productSubscriptionBuilder2) -> {
throw new UnsupportedOperationException();
})
.build();
exchange.connect(subscription).blockingAwait();
Thread.sleep(Long.MAX_VALUE);
}

}