16
16
import org .knowm .xchange .binance .BinanceAdapters ;
17
17
import org .knowm .xchange .binance .dto .marketdata .BinanceOrderbook ;
18
18
import org .knowm .xchange .binance .dto .marketdata .BinanceTicker24h ;
19
+ import org .knowm .xchange .binance .service .BinanceMarketDataService ;
19
20
import org .knowm .xchange .currency .CurrencyPair ;
20
21
import org .knowm .xchange .dto .Order .OrderType ;
21
22
import org .knowm .xchange .dto .marketdata .OrderBook ;
27
28
import org .slf4j .LoggerFactory ;
28
29
29
30
import java .io .IOException ;
30
- import java .util .ArrayList ;
31
31
import java .util .Date ;
32
32
import java .util .HashMap ;
33
33
import java .util .Map ;
34
+ import java .util .concurrent .atomic .AtomicLong ;
34
35
35
36
import static info .bitrich .xchangestream .binance .dto .BaseBinanceWebSocketTransaction .BinanceWebSocketTypes .DEPTH_UPDATE ;
36
37
import static info .bitrich .xchangestream .binance .dto .BaseBinanceWebSocketTransaction .BinanceWebSocketTypes .TICKER_24_HR ;
@@ -40,16 +41,17 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
40
41
private static final Logger LOG = LoggerFactory .getLogger (BinanceStreamingMarketDataService .class );
41
42
42
43
private final BinanceStreamingService service ;
43
- private final Map <CurrencyPair , OrderBook > orderbooks = new HashMap <>();
44
+ private final Map <CurrencyPair , OrderbookSubscription > orderbooks = new HashMap <>();
44
45
45
46
private final Map <CurrencyPair , Observable <BinanceTicker24h >> tickerSubscriptions = new HashMap <>();
46
47
private final Map <CurrencyPair , Observable <OrderBook >> orderbookSubscriptions = new HashMap <>();
47
48
private final Map <CurrencyPair , Observable <BinanceRawTrade >> tradeSubscriptions = new HashMap <>();
48
49
private final ObjectMapper mapper = new ObjectMapper ();
50
+ private final BinanceMarketDataService marketDataService ;
49
51
50
-
51
- public BinanceStreamingMarketDataService (BinanceStreamingService service ) {
52
+ public BinanceStreamingMarketDataService (BinanceStreamingService service , BinanceMarketDataService marketDataService ) {
52
53
this .service = service ;
54
+ this .marketDataService = marketDataService ;
53
55
mapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
54
56
}
55
57
@@ -126,34 +128,131 @@ private Observable<BinanceTicker24h> rawTickerStream(CurrencyPair currencyPair)
126
128
.map (transaction -> transaction .getData ().getTicker ());
127
129
}
128
130
131
+ private final class OrderbookSubscription {
132
+ long snapshotlastUpdateId ;
133
+ AtomicLong lastUpdateId = new AtomicLong (0L );
134
+ OrderBook orderBook ;
135
+ Observable <BinanceWebsocketTransaction <DepthBinanceWebSocketTransaction >> stream ;
136
+ AtomicLong lastSyncTime = new AtomicLong (0L );
137
+
138
+ void invalidateSnapshot () {
139
+ snapshotlastUpdateId = 0L ;
140
+ }
141
+
142
+ void initSnapshotIfInvalid (CurrencyPair currencyPair ) {
143
+
144
+ if (snapshotlastUpdateId != 0L )
145
+ return ;
146
+
147
+ // Don't attempt reconnects too often to avoid bans. 3 seconds will do it.
148
+ long now = System .currentTimeMillis ();
149
+ if (now - lastSyncTime .get () < 3000 ) {
150
+ return ;
151
+ }
152
+
153
+ try {
154
+ LOG .info ("Fetching initial orderbook snapshot for {} " , currencyPair );
155
+ BinanceOrderbook book = marketDataService .getBinanceOrderbook (currencyPair , 1000 );
156
+ snapshotlastUpdateId = book .lastUpdateId ;
157
+ lastUpdateId .set (book .lastUpdateId );
158
+ orderBook = BinanceMarketDataService .convertOrderBook (book , currencyPair );
159
+ } catch (Throwable e ) {
160
+ LOG .error ("Failed to fetch initial order book for " + currencyPair , e );
161
+ snapshotlastUpdateId = 0L ;
162
+ lastUpdateId .set (0L );
163
+ orderBook = null ;
164
+ }
165
+ lastSyncTime .set (now );
166
+ }
167
+ }
168
+
169
+ private OrderbookSubscription connectOrderBook (CurrencyPair currencyPair ) {
170
+ OrderbookSubscription subscription = new OrderbookSubscription ();
171
+
172
+ // 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth
173
+ // 2. Buffer the events you receive from the stream.
174
+ subscription .stream = service .subscribeChannel (channelFromCurrency (currencyPair , "depth" ))
175
+ .map ((JsonNode s ) -> depthTransaction (s .toString ()))
176
+ .filter (transaction ->
177
+ transaction .getData ().getCurrencyPair ().equals (currencyPair ) &&
178
+ transaction .getData ().getEventType () == DEPTH_UPDATE );
179
+
180
+
181
+ return subscription ;
182
+ }
183
+
129
184
private Observable <OrderBook > orderBookStream (CurrencyPair currencyPair ) {
130
- return service .subscribeChannel (channelFromCurrency (currencyPair , "depth" ))
131
- .map ((JsonNode s ) -> depthTransaction (s .toString ()))
132
- .filter (transaction ->
133
- transaction .getData ().getCurrencyPair ().equals (currencyPair ) &&
134
- transaction .getData ().getEventType () == DEPTH_UPDATE )
135
- .map (transaction -> {
136
- DepthBinanceWebSocketTransaction depth = transaction .getData ();
185
+ OrderbookSubscription subscription = orderbooks .computeIfAbsent (currencyPair , pair -> connectOrderBook (pair ));
186
+
187
+ return subscription .stream
188
+
189
+ // 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000
190
+ // (we do this if we don't already have one or we've invalidated a previous one)
191
+ .doOnNext (transaction -> subscription .initSnapshotIfInvalid (currencyPair ))
192
+
193
+ // If we failed, don't return anything. Just keep trying until it works
194
+ .filter (transaction -> subscription .snapshotlastUpdateId > 0L )
195
+
196
+ .map (BinanceWebsocketTransaction ::getData )
197
+
198
+ // 4. Drop any event where u is <= lastUpdateId in the snapshot
199
+ .filter (depth -> depth .getLastUpdateId () > subscription .snapshotlastUpdateId )
200
+
201
+ // 5. The first processed should have U <= lastUpdateId+1 AND u >= lastUpdateId+1
202
+ .filter (depth -> {
203
+ long lastUpdateId = subscription .lastUpdateId .get ();
204
+ if (lastUpdateId == 0L ) {
205
+ return depth .getFirstUpdateId () <= lastUpdateId + 1 &&
206
+ depth .getLastUpdateId () >= lastUpdateId + 1 ;
207
+ } else {
208
+ return true ;
209
+ }
210
+ })
137
211
138
- OrderBook currentOrderBook = orderbooks .computeIfAbsent (currencyPair , orderBook ->
139
- new OrderBook (null , new ArrayList <>(), new ArrayList <>()));
212
+ // 6. While listening to the stream, each new event's U should be equal to the previous event's u+1
213
+ .filter (depth -> {
214
+ long lastUpdateId = subscription .lastUpdateId .get ();
215
+ boolean result ;
216
+ if (lastUpdateId == 0L ) {
217
+ result = true ;
218
+ } else {
219
+ result = depth .getFirstUpdateId () == lastUpdateId + 1 ;
220
+ }
221
+ if (result ) {
222
+ subscription .lastUpdateId .set (depth .getLastUpdateId ());
223
+ } else {
224
+ // If not, we re-sync. This will commonly occur a few times when starting up, since
225
+ // given update ids 1,2,3,4,5,6,7,8,9, Binance may sometimes return a snapshot
226
+ // as of 5, but update events covering 1-3, 4-6 and 7-9. We can't apply the 4-6
227
+ // update event without double-counting 5, and we can't apply the 7-9 update without
228
+ // missing 6. The only thing we can do is to keep requesting a fresh snapshot until
229
+ // we get to a situation where the snapshot and an update event precisely line up.
230
+ LOG .info ("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing." , currencyPair , lastUpdateId , depth .getFirstUpdateId (), depth .getLastUpdateId ());
231
+ subscription .invalidateSnapshot ();
232
+ }
233
+ return result ;
234
+ })
140
235
236
+ // 7. The data in each event is the absolute quantity for a price level
237
+ // 8. If the quantity is 0, remove the price level
238
+ // 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal.
239
+ .map (depth -> {
141
240
BinanceOrderbook ob = depth .getOrderBook ();
142
- ob .bids .forEach ((key , value ) -> currentOrderBook .update (new OrderBookUpdate (
241
+ ob .bids .forEach ((key , value ) -> subscription . orderBook .update (new OrderBookUpdate (
143
242
OrderType .BID ,
144
243
null ,
145
244
currencyPair ,
146
245
key ,
147
246
depth .getEventTime (),
148
247
value )));
149
- ob .asks .forEach ((key , value ) -> currentOrderBook .update (new OrderBookUpdate (
248
+ ob .asks .forEach ((key , value ) -> subscription . orderBook .update (new OrderBookUpdate (
150
249
OrderType .ASK ,
151
250
null ,
152
251
currencyPair ,
153
252
key ,
154
253
depth .getEventTime (),
155
254
value )));
156
- return currentOrderBook ;
255
+ return subscription . orderBook ;
157
256
});
158
257
}
159
258
0 commit comments