Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concatenation of EventStreams #1

Open
cornerman opened this issue May 24, 2018 · 9 comments
Open

Concatenation of EventStreams #1

cornerman opened this issue May 24, 2018 · 9 comments
Labels

Comments

@cornerman
Copy link

Can we concatenate multiple EventStreams?

The use case is that I have two streams that should emit elements after each other:

val e1: EventStream = ???
val e2: EventStream = ???

 // all elements in e2 should come only after e1 has finished emitting items
val combined = e1 concat e2

In the readme, I saw that there is no concept of closing Observables. So, I was wondering whether this is possible?

@raquo
Copy link
Owner

raquo commented May 24, 2018

In the absence of built-in observable completion feature you could simulate it in a rather ugly way.

You could write custom logic using an EventBus that would add e2 as a source when some condition happens (e.g. e1 emits something like Done on "completion" after emitting Value(v)). Or I guess if not EventBus, using a merge / sample / filter operators.

Implementing a completion feature, while not a priority for now, is not out of the question. What are the real life use cases for concatenation?

@cornerman
Copy link
Author

Let's say I have a server-client scenario. In my client I have an eventstream coming in from a remote API. The events are mapped into a state which renders the ui. Furthermore I have some initialization logic in my app, which are also modeled as Events:

val apiEvents: EventStream[ApiEvent] = ???
val localEvents: EventStream[ApiEvent] = EventStream.fromSeq(...) 

Then I would like to combine these such that the localEvents come before the apiEvents. In this case, a function like apiEvents.startWith(Seq(...)) would even suffice (like monix). But what if localEvents comes in asynchronously or as a stream?

Another use case I had are observables derived from futures. So a future can be converted into an observable emitting only a single value eventually. Then I wanted to combine these while keeping the order.

@raquo
Copy link
Owner

raquo commented May 24, 2018

So... do you want apiEvents stream to be started (e.g. some ajax request triggered) when localEvents completes?

Or does apiEvents emit regardless of localEvents, and you just want to filter out events that come before localEvents completes?

Re: Creating observables from futures and flattening of observables of futures – that will be the next thing I implement in Airstream, regardless of completion / concat concepts.

@cornerman
Copy link
Author

Here, I was thinking that apiEvents and localEvents are both directly started. Then I want to prepend all local events before the api events.

Re: Creating observables from futures and flattening of observables of futures – that will be the next thing I implement in Airstream, regardless of completion / concat concepts.

Nice!

@raquo
Copy link
Owner

raquo commented May 25, 2018

Ok so it sounds like you want it to behave more or less like rxjs concat, which starts the next stream if and when the previous one completes.

Another way to possibly accomplish that is the flatten operator that we already have in Airstream. You could make a meta stream that emits a stream from which you want to get events, then flatten it.

I'm not sure what do the streams / events actually represent in your case, but I'm hoping the integration with futures that I mentioned will solve that issue.

@cornerman
Copy link
Author

Another way to possibly accomplish that is the flatten operator that we already have in Airstream. You could make a meta stream that emits a stream from which you want to get events, then flatten it.

Indeed, that could work in this case. I will try it out and let you know. Thank you!

@cornerman
Copy link
Author

Though, from reading the code, I got the impression that flatten works like switchMap in monix. Meaning that each new value in the outer observable will switch the flattened observable to the newly emitted EventStream. But for the example, I then need to take care when I switch from localEvents to apiEvents.

@raquo
Copy link
Owner

raquo commented May 26, 2018

You're correct that it works like switchMap. There is still no "stream completion" feature in Airstream, so yes, you'll need to manually trigger the switch somehow. Or – again – alternatively you can emit a "sentinel value" event that the consuming stream can interpret as a "stream completed" event. For now there's no pretty solution to this I'm afraid.


For anyone else having similar problems with lacking operators, keep in mind that Airstream is very forgiving of impurity – all our stuff is shared execution, that is, the stream will run exactly once regardless of how many subscribers it has (as long as it's at least one). So it's relatively easy to create custom operators that keep internal state.

@cornerman
Copy link
Author

Thanks for the detailed response! I think I can work with a sentinel value for now. It should even be possible to make this generic enough to be reusable for my project :)

@raquo raquo added enhancement New feature or request question Further information is requested labels Jul 9, 2018
@raquo raquo mentioned this issue Sep 9, 2018
@raquo raquo added design and removed enhancement New feature or request question Further information is requested labels May 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants