14
14
import info .bitrich .xchangestream .service .netty .StreamingObjectMapperHelper ;
15
15
import io .reactivex .Observable ;
16
16
import io .reactivex .functions .Consumer ;
17
+ import io .reactivex .observables .ConnectableObservable ;
18
+
17
19
import org .knowm .xchange .binance .BinanceAdapters ;
18
20
import org .knowm .xchange .binance .dto .marketdata .BinanceOrderbook ;
19
21
import org .knowm .xchange .binance .dto .marketdata .BinanceTicker24h ;
22
+ import org .knowm .xchange .binance .service .BinanceMarketDataService ;
20
23
import org .knowm .xchange .currency .CurrencyPair ;
21
24
import org .knowm .xchange .dto .Order .OrderType ;
22
25
import org .knowm .xchange .dto .marketdata .OrderBook ;
32
35
import java .util .Date ;
33
36
import java .util .HashMap ;
34
37
import java .util .Map ;
38
+ import java .util .concurrent .atomic .AtomicLong ;
35
39
36
40
import static info .bitrich .xchangestream .binance .dto .BaseBinanceWebSocketTransaction .BinanceWebSocketTypes .DEPTH_UPDATE ;
37
41
import static info .bitrich .xchangestream .binance .dto .BaseBinanceWebSocketTransaction .BinanceWebSocketTypes .TICKER_24_HR ;
@@ -41,16 +45,18 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
41
45
private static final Logger LOG = LoggerFactory .getLogger (BinanceStreamingMarketDataService .class );
42
46
43
47
private final BinanceStreamingService service ;
44
- private final Map <CurrencyPair , OrderBook > orderbooks = new HashMap <>();
48
+ private final Map <CurrencyPair , OrderbookSubscription > orderbooks = new HashMap <>();
45
49
46
50
private final Map <CurrencyPair , Observable <BinanceTicker24h >> tickerSubscriptions = new HashMap <>();
47
51
private final Map <CurrencyPair , Observable <OrderBook >> orderbookSubscriptions = new HashMap <>();
48
52
private final Map <CurrencyPair , Observable <BinanceRawTrade >> tradeSubscriptions = new HashMap <>();
49
53
private final ObjectMapper mapper = StreamingObjectMapperHelper .getObjectMapper ();
54
+ private final BinanceMarketDataService marketDataService ;
50
55
51
-
52
- public BinanceStreamingMarketDataService (BinanceStreamingService service ) {
56
+ public BinanceStreamingMarketDataService (BinanceStreamingService service , BinanceMarketDataService marketDataService ) {
53
57
this .service = service ;
58
+ this .marketDataService = marketDataService ;
59
+ mapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
54
60
}
55
61
56
62
@ Override
@@ -126,34 +132,132 @@ private Observable<BinanceTicker24h> rawTickerStream(CurrencyPair currencyPair)
126
132
.map (transaction -> transaction .getData ().getTicker ());
127
133
}
128
134
129
- private Observable <OrderBook > orderBookStream (CurrencyPair currencyPair ) {
130
- return service .subscribeChannel (channelFromCurrency (currencyPair , "depth" ))
135
+ private final class OrderbookSubscription {
136
+ long snapshotlastUpdateId ;
137
+ AtomicLong lastUpdateId = new AtomicLong (0L );
138
+ OrderBook orderBook ;
139
+ ConnectableObservable <BinanceWebsocketTransaction <DepthBinanceWebSocketTransaction >> stream ;
140
+ AtomicLong lastSyncTime = new AtomicLong (0L );
141
+
142
+ void invalidateSnapshot () {
143
+ snapshotlastUpdateId = 0L ;
144
+ }
145
+
146
+ void initSnapshotIfInvalid (CurrencyPair currencyPair ) {
147
+
148
+ if (snapshotlastUpdateId != 0L )
149
+ return ;
150
+
151
+ // Don't attempt reconnects too often to avoid bans. 3 seconds will do it.
152
+ long now = System .currentTimeMillis ();
153
+ if (now - lastSyncTime .get () < 3000 ) {
154
+ return ;
155
+ }
156
+
157
+ try {
158
+ LOG .info ("Fetching initial orderbook snapshot for {} " , currencyPair );
159
+ BinanceOrderbook book = marketDataService .getBinanceOrderbook (currencyPair , 1000 );
160
+ snapshotlastUpdateId = book .lastUpdateId ;
161
+ lastUpdateId .set (book .lastUpdateId );
162
+ orderBook = BinanceMarketDataService .convertOrderBook (book , currencyPair );
163
+ } catch (Throwable e ) {
164
+ LOG .error ("Failed to fetch initial order book for " + currencyPair , e );
165
+ snapshotlastUpdateId = 0L ;
166
+ lastUpdateId .set (0L );
167
+ orderBook = new OrderBook (null , new ArrayList <>(), new ArrayList <>());
168
+ }
169
+ lastSyncTime .set (now );
170
+ }
171
+ }
172
+
173
+ private OrderbookSubscription connectOrderBook (CurrencyPair currencyPair ) {
174
+ OrderbookSubscription subscription = new OrderbookSubscription ();
175
+
176
+ // 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth
177
+ subscription .stream = service .subscribeChannel (channelFromCurrency (currencyPair , "depth" ))
131
178
.map ((JsonNode s ) -> depthTransaction (s .toString ()))
132
179
.filter (transaction ->
133
180
transaction .getData ().getCurrencyPair ().equals (currencyPair ) &&
134
181
transaction .getData ().getEventType () == DEPTH_UPDATE )
135
- .map (transaction -> {
136
- DepthBinanceWebSocketTransaction depth = transaction .getData ();
137
182
138
- OrderBook currentOrderBook = orderbooks .computeIfAbsent (currencyPair , orderBook ->
139
- new OrderBook (null , new ArrayList <>(), new ArrayList <>()));
183
+ // 2.Buffer the events you receive from the stream.
184
+ // This is solely to allow room for us to periodically fetch a fresh snapshot
185
+ // in the event that binance sends events out of sequence or skips events.
186
+ .replay ();
187
+ subscription .stream .connect ();
188
+
189
+ return subscription ;
190
+ }
191
+
192
+ private Observable <OrderBook > orderBookStream (CurrencyPair currencyPair ) {
193
+ OrderbookSubscription subscription = orderbooks .computeIfAbsent (currencyPair , pair -> connectOrderBook (pair ));
194
+
195
+ return subscription .stream
196
+
197
+ // 3. Get a depth snapshot from https://www.binance.com/api/v1/depth?symbol=BNBBTC&limit=1000
198
+ // (we do this if we don't already have one or we've invalidated a previous one)
199
+ .doOnNext (transaction -> subscription .initSnapshotIfInvalid (currencyPair ))
200
+
201
+ .map (BinanceWebsocketTransaction ::getData )
202
+
203
+ // 4. Drop any event where u is <= lastUpdateId in the snapshot
204
+ .filter (depth -> depth .getLastUpdateId () > subscription .snapshotlastUpdateId )
205
+
206
+ // 5. The first processed should have U <= lastUpdateId+1 AND u >= lastUpdateId+1
207
+ .filter (depth -> {
208
+ long lastUpdateId = subscription .lastUpdateId .get ();
209
+ if (lastUpdateId == 0L ) {
210
+ return depth .getFirstUpdateId () <= lastUpdateId + 1 &&
211
+ depth .getLastUpdateId () >= lastUpdateId + 1 ;
212
+ } else {
213
+ return true ;
214
+ }
215
+ })
216
+
217
+ // 6. While listening to the stream, each new event's U should be equal to the previous event's u+1
218
+ .filter (depth -> {
219
+ long lastUpdateId = subscription .lastUpdateId .get ();
220
+ boolean result ;
221
+ if (lastUpdateId == 0L ) {
222
+ result = true ;
223
+ } else {
224
+ result = depth .getFirstUpdateId () == lastUpdateId + 1 ;
225
+ }
226
+ if (result ) {
227
+ subscription .lastUpdateId .set (depth .getLastUpdateId ());
228
+ } else {
229
+ // If not, we re-sync. This will commonly occur a few times when starting up, since
230
+ // given update ids 1,2,3,4,5,6,7,8,9, Binance may sometimes return a snapshot
231
+ // as of 5, but update events covering 1-3, 4-6 and 7-9. We can't apply the 4-6
232
+ // update event without double-counting 5, and we can't apply the 7-9 update without
233
+ // missing 6. The only thing we can do is to keep requesting a fresh snapshot until
234
+ // we get to a situation where the snapshot and an update event precisely line up.
235
+ LOG .info ("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing." , currencyPair , lastUpdateId , depth .getFirstUpdateId (), depth .getLastUpdateId ());
236
+ subscription .invalidateSnapshot ();
237
+ }
238
+ return result ;
239
+ })
140
240
241
+ // 7. The data in each event is the absolute quantity for a price level
242
+ // 8. If the quantity is 0, remove the price level
243
+ // 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal.
244
+ .map (depth -> {
141
245
BinanceOrderbook ob = depth .getOrderBook ();
142
- ob .bids .forEach ((key , value ) -> currentOrderBook .update (new OrderBookUpdate (
246
+ ob .bids .forEach ((key , value ) -> subscription . orderBook .update (new OrderBookUpdate (
143
247
OrderType .BID ,
144
248
null ,
145
249
currencyPair ,
146
250
key ,
147
251
depth .getEventTime (),
148
252
value )));
149
- ob .asks .forEach ((key , value ) -> currentOrderBook .update (new OrderBookUpdate (
253
+ ob .asks .forEach ((key , value ) -> subscription . orderBook .update (new OrderBookUpdate (
150
254
OrderType .ASK ,
151
255
null ,
152
256
currencyPair ,
153
257
key ,
154
258
depth .getEventTime (),
155
259
value )));
156
- return currentOrderBook ;
260
+ return subscription . orderBook ;
157
261
});
158
262
}
159
263
0 commit comments