Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

[WORK IN PROGRESS] Generic authenticated stream API #274

Closed
8 tasks done
badgerwithagun opened this issue Jan 27, 2019 · 29 comments · Fixed by #340
Closed
8 tasks done

[WORK IN PROGRESS] Generic authenticated stream API #274

badgerwithagun opened this issue Jan 27, 2019 · 29 comments · Fixed by #340

Comments

@badgerwithagun
Copy link
Collaborator

badgerwithagun commented Jan 27, 2019

Introduction

I intend to create StreamingTradeService and StreamingAccountService interfaces to accompany StreamingMarketDataService, mirroring XChange's core APIs.

These will deliver the following:

  • Balance updates
  • Authenticated (user) trades
  • Order updates/execution reports

This has been a long time coming, but I'm now working on it. I'll will publish regular PRs to merge work into this repo as I go, status updates here, and fork URLs for anyone that wants to use the work in progress.

Please post here if you have thoughts on the API or implementation approach.

Approach

PRs #273 (formerly #209), #267, #246 and #244 all have taken different approaches to user-authenticated streams. I am trying to standardise these around the approach of #246 and #273 to follow on from #160, which is:

  • If the same socket as the public stream can be "elevated" to an authenticated stream, it should be, rather than opening multiple streams
  • xchange-stream will open an authenticated stream (or elevate the public one) if there are API key details provided in the ExchangeSpecification
  • Supported exchanges will provide a "level 2", exchange-specific API, exposing the raw channel data on public methods. So, for example, BinanceStreamingTradeService will have a getExecutionReports() method which returns instances of the execution report data structure in the binance API docs.
  • Where possible, the exchanges will support the generic "level 1" APIs detailed below by combining, filtering and transforming the output of their level 2 APIs.
  • All data will be provided in a PublishSubject-like fashion. If you're subscribed to the Observable, you get the data. If you're not, you don't. Nothing will be buffered.
  • I will create new StreamingTradeService and StreamingAccountService interfaces and move the new methods from StreamingMarketDataService to bring the location of these methods in line with XChange.
  • The streams contain changes/deltas only
  • The streams have no guarantee that updates won't be missed, and no way to tell if they have
    Therefore, to use the streams reliably, you should combine these streams with polling to make sure any local snapshots you create are re-synced regularly

API changes

The following new methods will be created:

Service Signature Description Notes
StreamingTradeService Observable<UserTrade> getUserTrades(CurrencyPair currencyPair, Object... args) This will return reports of the user's own trades. Currently, for Coinbase Pro and Binance, we return these "mixed" with the public trades in getTrades(). This will be changed. If you need UserTrades mixed with your Trades, just Observable.merge() the two streams. This will be more flexible.
StreamingTradeService Observable<Order> getOrderUpdates(CurrencyPair currencyPair, Object... args) This will return updates to the user's open orders. If you receive a status indicating that the order has been removed (e.g. CANCELED) you should assume you'll get no more updates. A status of NEW will indicate acceptance and appearance on the order book and PENDING_NEW will indicate receipt but not confirmation.
StreamingAccountService Observable<Balance> getBalanceUpdates(Currency currency, Object... args) This will return updates to the specified currency. None

These will explicitly state that they do not guarantee delivery or sequence (see below for the discussion around this).

Status

Level 2 Support

Is already in the mainline.

Level 1 support

PR #340 has been issued but should be considered ready for merge. I'm conducting some production testing.

@badgerwithagun
Copy link
Collaborator Author

One challenge I am working on is how to make sure that the streams arrive in order without any exchange-specific reconciliation logic. Relying on a websocket feed for updates of snapshot data is difficult and risky, and every exchange has a different approach (e.g. Binance always supplies timestamps).

Any suggestions here obviously very welcome.

@badgerwithagun
Copy link
Collaborator Author

@dozd, does this fit with the existing design?

I'd like to know we have agreement before making much more progress.

@dozd
Copy link
Member

dozd commented Jan 28, 2019

I like the general ideas from the approach. These points all make good sense, good work!

badgerwithagun added a commit to badgerwithagun/xchange-stream that referenced this issue Jan 28, 2019
…ned in bitrich-info#274.

Coinbase Pro is a challenge here. It seems to need an initial snapshot. At the moment the implementation is bare-bones.
Need to start working on the ordering challenge. All three seem to provide a sequence number. This will be essential for maintaining any sort of snapshot.
Not done balances yet.
badgerwithagun added a commit to badgerwithagun/xchange-stream that referenced this issue Jan 28, 2019
…ned in bitrich-info#274.

Coinbase Pro is a challenge here. It seems to need an initial snapshot. At the moment the implementation is bare-bones.
Need to start working on the ordering challenge. All three seem to provide a sequence number. This will be essential for maintaining any sort of snapshot.
Not done balances yet.
@badgerwithagun
Copy link
Collaborator Author

Binance Level 2 support is now in! Just need Bitfinex support in and I'll publicise my fork where I'm building the Level 1 support on top (plus lots of additional L2 support...)

@badgerwithagun badgerwithagun self-assigned this Jan 29, 2019
badgerwithagun added a commit to badgerwithagun/xchange-stream that referenced this issue Jan 29, 2019
…ned in bitrich-info#274.

Coinbase Pro is a challenge here. It seems to need an initial snapshot. At the moment the implementation is bare-bones.
Need to start working on the ordering challenge. All three seem to provide a sequence number. This will be essential for maintaining any sort of snapshot.
Not done balances yet.
@badgerwithagun
Copy link
Collaborator Author

Bitfinex support now into develop. I've added a link to the work-in-progress branch above.

@dozd
Copy link
Member

dozd commented Jan 30, 2019

I plan to make a release tomorrow (was busy on Tuesday). Should I wait or is it ok?

@badgerwithagun
Copy link
Collaborator Author

The new L2 APIs aren't stable at the moment, but I don't think that's a good reason to delay resynchronization with XChange's release cycle. Do it :)

@badgerwithagun
Copy link
Collaborator Author

Binance and Bitfinex balance updates now working. However, there is currently no ordering protection. That will arrive as I get Coinbase Pro working, which requires snapshot management to work at all.

@badgerwithagun
Copy link
Collaborator Author

I think this is ready for a pull request. No changes to the level of functionality, but it is useful in its current state and I've been running it successfully in production for a couple of weeks now.

I'll keep the branch open and keep working towards reliable streams.

@badgerwithagun
Copy link
Collaborator Author

Updated OP to describe how the approach has evolved and clearly indicate current status.

@wlk
Copy link

wlk commented Feb 21, 2019

@badgerwithagun I have just started using the library, but this is something I'm very interested in having. I can help out with review or anything else if there's something specific you need help with (I have been using xchange for a many months with different exchanges, but only just stated with xchange-stream with Binance for start).

@badgerwithagun
Copy link
Collaborator Author

@wlk thanks. I'll need a review when I publish the first PR.

One thing I really need, for all three exchanges, is a clear guide for how to manage sequencing and dropped messages. They all publish instructions on this, in different levels of detail. What I'd like is to have the information in one place so I can start designing a common pattern which can then be re-used as more exchanges are added.

It's a surprisingly difficult thing.

@badgerwithagun
Copy link
Collaborator Author

I've been working on other stuff recently, but hope to focus back on this soon. The branch linked above is working really well for what it does, but without ordering/dropped message protection it's not safe to trust it, so I don't want to submit a PR yet.

@garciapd
Copy link

Hi @badgerwithagun, I would be happy to help you finishing this, I will take a look at the current code and think a potential solution for the order/dropped messages protection. Let me know if you want to have a chat about that.

@badgerwithagun
Copy link
Collaborator Author

Please elaborate, @garciapd!

The big problem I have found is that none of the exchanges I have been analysing implement any sort of dropped message protection. If I connect, then pull the internet cable, create an order then reconnect the internet, I can miss the message where the order was connected and get no clue that anything was missed. Some, like Binance, provide enough clues that you've missed a message when you receive a message following it, but none seem to.provide heartbeat messages which will tell you that you might have missed something in the meantime.

@garciapd
Copy link

garciapd commented Apr 5, 2019

Hi @badgerwithagun, I think there is no way to do that by just using their websocket API (Bitfinex), we could provide a listener that notifies when the websocket is connected and when is disconnected, then we could know when was the last time the websocket was disconnected and keep track of all the orders done while the websocket wasn't connected. When the websocket is connected again we can notify, and the user will know that from now he will start receiving updates. For the orders done while the websocket was disconnected, he will have to use the REST endpoints.

I found that ticket of somebody asking that, maybe you find it helpful too ([https://github.com/bitfinexcom/bitfinex-api-node/issues/247])

What do you think about providing a listener and let the user to re-sync specific orders?

@garciapd
Copy link

garciapd commented Apr 9, 2019

Hi @badgerwithagun , I did some draft implementation as a proposal to deal with the order updates internally, the idea is to keep track of every order made, have them cached and when there isn't an update for a while we can trigger an update via REST.

garciapd@067cc66

Do you think that could be a good way?

@badgerwithagun
Copy link
Collaborator Author

@garciapd I was trying out similar ideas. It starts to get a bit messy though due to the possibility of race conditions between the REST call and any socket updates that might suddenly arrive while the call is occurring. These could result in us sending the older data last and the newer first. Binance has a solution for this in the form of a timestamp on all socket and REST responses which is coherent (see my branch badgerwithagun@1dd746c) but I can't extend this logic to all exchanges.

@badgerwithagun
Copy link
Collaborator Author

Also, I have started to think that the getOrderChanges stream needs to simply change to getOpenOrders and return a collection - we have the same problem as with the order book stream where we sometimes need to send full snapshots (when we are unsure whether we are still in sync) and sometimes updates to that snapshot. By making the stream "updates only" we lose that capability.

@garciapd
Copy link

I agree, this kind of solution will become messy and bring potential edge cases. I think is a good idea to change the name to getOpenOrders, it would be ideal if the interval for getting the full snapshots can be configured.

@garciapd
Copy link

@badgerwithagun what is missing then for the PR?

@badgerwithagun
Copy link
Collaborator Author

badgerwithagun commented Apr 14, 2019

Here's my thinking.

I have already built a layer over the top of XChange and xchange-stream that combines polling with websockets as part of Orko. It's open source so anyone can copy it (see MarketDataSubscriptionManager ). This approach has the advantage that it always works, even for exchanges that don't have support in xchange-stream (by falling back to core XChange APIs). However, if websockets are available, it uses them to be able to respond quicker.

I wanted to improve on this though. I wanted to eliminate REST polling.

I am now starting to think it's not possible in a generic way without making xchange-stream too difficult to maintain. Binance is a dream come true (strangely!); it has all the right answers to these problems, but most other exchanges don't.

I think we could probably work out a way around these problems on an exchange-by-exchange basis, by combining idle detection, polling and lots of other tricks, but my fear is that the result will just be too complicated. It'll get broken by a bad PR pretty quickly (it's really, really hard to review xchange-stream PRs well, since no-one uses the same combination of exchanges, and it's impossible to write good tests without "official" stubs). Anyone that tries to implement this new behaviour for additional exchanges will probably get it wrong somehow.

You'll notice that MarketDataSubscriptionManager is pretty complex. Poll loops are a problem to support. You get errors from those requests constantly and have to decide which ones are problems and which are not. You need to handle backoff when there are errors so you don't spam the exchange and generate thousands of stacktraces during downtime. I'm constantly tweaking that class to deal with new problems exchanges throw at me.

At their hearts, both XChange and xchange-stream are pretty simple wrappers around the exchange APIs, and should probably stay that way. Easy to write, easy to review, but as a result, not "clever".

All this is a long way of saying: maybe I've taken this as far as we should take it... LGTM. That means, remove these from the spec above:

  • Add initial snapshots on initial opening of a stream (using REST API if the exchange doesn't already send it)
  • Detect out-of-sequence messages from the server, filter these out and resynchronise by fetching and returning a fresh snapshot from the REST API
  • Manage the fact that Coinbase Pro only sends deltas for order status/balance updates by combining the above two techniques to derive a full in-memory snapshot.

And replace with:

  • The streams contain changes/deltas only
  • The streams have no guarantee that updates won't be missed, and no way to tell if they have
  • Therefore, to use the streams reliably, you should combine these streams with polling to make sure any local snapshots you create are re-synced regularly

Then LGTM.

This may not be ideal, but it means that xchange-stream remains a relatively simple API around the exchange APIs which is easy for developers to extend, and not a complex beast like Orko's MarketDataSubscriptionManager.

@garciapd
Copy link

I share the thoughts, I think either XChange and XChange-stream should be easy to use and provide a simple API, clear, with no hidden tricks. Then each developer can use them the way they need. I think it is easier to know the limitations of the exchange rather than having to dive into the code to check the tricks that have been done to deal with the sync and out of order messages (as every exchange is different)

It is nice to have smart libraries, but that could be done besides the "standard API", having like BitfinexStreamExchange and SmartBitfinexStreamExchange xD.

I vote to do the replacement of the description :)

Btw, nice work with Orko. I'm creating VirtuaBroker

@badgerwithagun
Copy link
Collaborator Author

Btw, nice work with Orko. I'm creating VirtuaBroker

Nice!

I vote to do the replacement of the description :)

OK, next time I get a spare moment, I'll see if I can get the existing code to line up with what we've discussed. It should be easy enough - only Coinbase will require any extra thought.

@garciapd
Copy link

I've been reading the Coinbase documentation and I can only confirm what you said hehe.

In order to get all the information about the orders, they suggest to queue the messages coming from the stream and fill them with the REST endpoints. And there isn't initial snapshot.

Maybe we could write an example (I can do it) about how to use the websocket and REST combined in case someone needs the full information of the orders. But we can leave the getOrderChanges method as it is.

And the same to get a full snapshot of your orders before getting the deltas, the snapshot can be simply done by calling getOpenOrders from the org.knowm.xchange.Exchange.getTradeService() right? And right after receive the deltas (and if someone needs to fill the order with more info, as I said before can be done using the REST call)

Of course it would be ideal if all of this logic was done by the library, but as we discussed before, it has implications, for instance making calls to the REST endpoint can get you throttled, and it slows down the updates, maybe you don't always need all the info.

@badgerwithagun
Copy link
Collaborator Author

badgerwithagun commented Jun 17, 2019

I've woken back up and am going to get this tidied up finally and a PR created shortly.

@makarid
Copy link
Contributor

makarid commented Jun 22, 2019

Thank you for starting this update @badgerwithagun !!! I will implement the Coinmate exchange when you PR.

@badgerwithagun
Copy link
Collaborator Author

No problem.

I did a bit more work today. It's almost there :)

@badgerwithagun
Copy link
Collaborator Author

Code I am now testing is opened as #340.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
5 participants