Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Stream]Multiple NettyStreamingService error and (re)connection handling #3537

Closed
mdvx opened this issue May 25, 2020 · 13 comments
Closed

[Stream]Multiple NettyStreamingService error and (re)connection handling #3537

mdvx opened this issue May 25, 2020 · 13 comments

Comments

@mdvx
Copy link
Contributor

mdvx commented May 25, 2020

Right now, I can't implement

class GeminiStreamingExchange { @Override public Observable<Throwable> reconnectFailure() { return streamingService.subscribeReconnectFailure(); } @Override public Observable<Object> connectionSuccess() { return streamingService.subscribeConnectionSuccess(); } }

Because Gemini is using several GeminiProductStreamingService (which derives from [Json]NettyStreamingService) instead of a single JsonNettyStreamingService
private Map<CurrencyPair, GeminiProductStreamingService> productStreamingServices;

This also applies to other services which have two JsonNettyStreamingService instances (one for market data one for private data).

Any thoughts, on how we can aggregate these events across multiple instances?

Originally posted by @mdvx in #3533 (comment)

@earce
Copy link
Collaborator

earce commented May 25, 2020

The way I see it at a high level the reason Gemini is a little different to implement this with is, that rather then connect to a base url e.g. wss://api.gemini.com/v1/marketdata/ the url you connect to has the pair you care about baked into it wss://api.gemini.com/v1/marketdata/btcusd and does not subscribe to individual channels later on. This breaks away from the paradigm I observe to be common throughout the codebase.

What was done in the codebase to compensate for this shortcoming was have GeminiProductStreamingService.java created in addition to GeminiStreamingService.java to split this up.

One way I see about doing this is to maybe have an overloaded method protected abstract Completable openConnection(final String url) in ConnectableService.java take a url and be able to pass this into the connect function of this class. It would need some invasive changes but that is possible one way to try this. It would also help with exchanges that popup that may have a similar methodology.

The other way would be something along the lines of exposing the objects you need from GeminiProductStreamingService.java and aggregating your events that way.

@badgerwithagun
Copy link
Collaborator

The way I see it at a high level the reason Gemini is a little different to implement this with is, that rather then connect to a base url e.g. wss://api.gemini.com/v1/marketdata/ the url you connect to has the pair you care about baked into it wss://api.gemini.com/v1/marketdata/btcusd and does not subscribe to individual channels later on. This breaks away from the paradigm I observe to be common throughout the codebase.

Binance does the same thing.

@badgerwithagun
Copy link
Collaborator

There was some discussion about this when the private streaming services were added. It's really not hard to make the API emit signals for both (just Observable.merge the two streams from the two streaming services) but you then get a situation where you get two alerts on connection and two on disconnection, so how can you tell what's connected and what's not?

I think the API should probably be more like:

public Observable<StreamState> state();

where

enum StreamState {
  DISCONNECTED,
  PARTIALLY_CONNECTED,
  CONNECTED
}

Then the implementation can do something like this:

AtomicReference<State> service1State = new AtomicReference();
AtomicReference<State> service2State = new AtomicReference();
return Observable.merge(
    service1.state().doOnNext(service1State::set),
    service2.state().doOnNext(service2State::set))
  .map(__ -> combineStates(service1State.get(), service2State.get());

Where combineStates returns CONNECTED if both are connected. DISCONNECTED if both are disconnected and PARTIALLY_CONNECTED otherwise.

Obviously this would change a bit if you have N streams, but the basic idea stands and the API works in all cases.

I'm sure we could come up with endless variations on this theme.

@mdvx
Copy link
Contributor Author

mdvx commented May 25, 2020

I like the way this is going. Can we now extend this cover re-connections?

T A B Result
0 Connecting DISCONNECTED
1 Connected CONNECTED
2 Connected Connecting PARTIALLY_CONNECTED
3 Connected Connected CONNECTED
4 Disconnected Connected PARTIALLY_CONNECTED
5 Disconnected Disconnected DISCONNECTED
6 Connecting Disconnected DISCONNECTED
7 Connected Disconnected PARTIALLY_CONNECTED

@mdvx
Copy link
Contributor Author

mdvx commented May 25, 2020

And maybe add READY, once client re-initialize after connect has been complete?

enum StreamState { DISCONNECTED, PARTIALLY_CONNECTED, CONNECTED, READY }

@earce
Copy link
Collaborator

earce commented May 25, 2020

@badgerwithagun this is a pretty elegant solution, @mdvx what would be the difference in application state for CONNECTED vs READY?

@mdvx
Copy link
Contributor Author

mdvx commented May 25, 2020

Just that some post connect code had run, in truth i could be before state goes to connected

@mdvx
Copy link
Contributor Author

mdvx commented May 25, 2020

excpect the post connect code might only run if it is connected. In my case, i would rebuild my caches here, so this code should return a Completable.

@mdvx
Copy link
Contributor Author

mdvx commented May 25, 2020

maybe there are two READY states, on per connection, and one for the aggregate ALL_READY?

@earce
Copy link
Collaborator

earce commented May 25, 2020

Yea if it really necessary to have a state differentiator to effectively say connected state vs connected state where post connect hooks have ran it it makes sense but if we that isn't necessary I would err on the side of less states.

I would imagine you either just transition to CONNECTED and run your post connect hooks then or CONNECTED tells you you connected and ran post connect hooks. I would think you would first go to connected state and then run post connect hooks.

Why would you want the READY vs ALL_READY?

@mdvx
Copy link
Contributor Author

mdvx commented May 25, 2020

ALL_READY is exchange level, it just means all the connections are READY, and none are still DISCONNECTED (or connecting, or unready)

@earce
Copy link
Collaborator

earce commented May 25, 2020

In this case what state would get us in READY but not in ALL_READY?

@mdvx
Copy link
Contributor Author

mdvx commented May 25, 2020 via email

@mdvx mdvx closed this as completed Oct 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants