Skip to content

Commit

Permalink
Add operators to combine or merge multiple Observables at once
Browse files Browse the repository at this point in the history
  • Loading branch information
LukaJCB committed Nov 28, 2016
1 parent ad0959a commit 18fe357
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 4 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name := "RxScala.js"

normalizedName := "rxscala-js"

version := "0.9.2"
version := "0.10.0"

organization := "com.github.lukajcb"

Expand Down
139 changes: 136 additions & 3 deletions src/main/scala/rxscalajs/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,9 @@ class Observable[+T] protected[rxscalajs](val inner: ObservableFacade[T]) {
}

/**
* Combines two observables, emitting a pair of the latest values of each of
* the source observables each time an event is received from one of the source observables.
* Combines two observables, emitting some type `R` specified in the function `selector`,
* each time an event is received from one of the source observables, where the aggregation
* is defined by the given function.
*
*
* <img width="640" height="410" src="http://reactivex.io/documentation/operators/images/withLatestFrom.png" alt="" />
Expand All @@ -662,7 +663,57 @@ class Observable[+T] protected[rxscalajs](val inner: ObservableFacade[T]) {
}

/**
* Combines two observables, emitting a pair of the latest values of each of
* Combines three observables, emitting some type `R` specified in the function `selector`,
* each time an event is received from one of the source observables, where the aggregation
* is defined by the given function.
*
* <img width="640" height="410" src="http://reactivex.io/documentation/operators/images/withLatestFrom.png" alt="" />
*
* @param first
* an Observable to be combined
* @param second
* an Observable to be combined
* @param selector
* The function that is used combine the emissions of the three observables.
*
* @return An Observable that combines the source Observables according to the function selector.
*/
def combineLatestWith[U,V,R](first: Observable[U], second: Observable[V])
(selector: (T, U, V) => R): Observable[R] = {
combineLatest(first)
.combineLatestWith(second)((tu, v) => selector(tu._1, tu._2, v))

}


/**
* Combines four observables, emitting some type `R` specified in the function `selector`,
* each time an event is received from one of the source observables, where the aggregation
* is defined by the given function.
*
* <img width="640" height="410" src="http://reactivex.io/documentation/operators/images/withLatestFrom.png" alt="" />
*
* @param first
* an Observable to be combined
* @param second
* an Observable to be combined
* @param third
* an Observable to be combined
* @param selector
* The function that is used combine the emissions of the four observables.
*
* @return An Observable that combines the source Observables according to the function selector.
*/
def combineLatestWith[U,V,W,R](first: Observable[U], second: Observable[V], third: Observable[W])
(selector: (T, U, V, W) => R): Observable[R] = {
combineLatest(first)
.combineLatestWith(second)((tu, v) => (tu._1, tu._2, v))
.combineLatestWith(third)((tuv, w) => selector(tuv._1, tuv._2, tuv._3, w))

}

/**
* Combines two observables, emitting a combination of the latest values of each of
* the source observables each time an event is received from one of the source observables.
*
* <img width="640" height="410" src="http://reactivex.io/documentation/operators/images/combineLatest.png" alt="" />
Expand All @@ -676,6 +727,46 @@ class Observable[+T] protected[rxscalajs](val inner: ObservableFacade[T]) {
combineLatestWith(that)((t, u) => (t, u))
}

/**
* Combines four observables, emitting a tuple of the latest values of each of
* the source observables each time an event is received from one of the source observables.
*
* <img width="640" height="410" src="http://reactivex.io/documentation/operators/images/combineLatest.png" alt="" />
*
* @param first
* an Observable to be combined
* @param second
* an Observable to be combined
* @param third
* an Observable to be combined
*
* @return An Observable that combines the source Observables
*/
def combineLatest[U,V](first: Observable[U], second: Observable[V]): Observable[(T, U, V)] = {
combineLatestWith(first, second)((t, u, v) => (t, u, v))
}

/**
* Combines four observables, emitting a tuple of the latest values of each of
* the source observables each time an event is received from one of the source observables.
*
* <img width="640" height="410" src="http://reactivex.io/documentation/operators/images/combineLatest.png" alt="" />
*
* @param first
* an Observable to be combined
* @param second
* an Observable to be combined
* @param third
* an Observable to be combined
*
* @return An Observable that combines the source Observables
*/
def combineLatest[U,V,W](first: Observable[U], second: Observable[V], third: Observable[W]): Observable[(T, U, V, W)] = {
combineLatestWith(first, second, third)((t, u, v, w) => (t, u, v, w))
}



/**
* Returns an Observable that first emits the items emitted by `this`, and then the items emitted
* by `other`.
Expand Down Expand Up @@ -1403,6 +1494,48 @@ class Observable[+T] protected[rxscalajs](val inner: ObservableFacade[T]) {
new Observable(inner.merge(that))
}

/**
* Flattens three Observables into one Observable, without any transformation.
*
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="" />
*
* You can combine items emitted by three Observables so that they act like a single
* Observable by using the `merge` method.
*
* @param first
* an Observable to be merged
* @param second
* an Observable to be merged
*
* @return an Observable that emits items from all Observables until
* one emits `onError` or all Observables emit `onCompleted`.
*/
def merge[R >: T](first: Observable[R], second: Observable[R]): Observable[R] = {
new Observable(inner.merge(first).merge(second))
}

/**
* Flattens four Observables into one Observable, without any transformation.
*
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="" />
*
* You can combine items emitted by four Observables so that they act like a single
* Observable by using the `merge` method.
*
* @param first
* an Observable to be merged
* @param second
* an Observable to be merged
* @param third
* an Observable to be merged
*
* @return an Observable that emits items from all Observables until
* one emits `onError` or all Observables emit `onCompleted`.
*/
def merge[R >: T](first: Observable[R], second: Observable[R], third: Observable[R]): Observable[R] = {
new Observable(inner.merge(first).merge(second).merge(third))
}

/**
* Flattens the sequence of Observables emitted by `this` into one Observable, without any
* transformation.
Expand Down
6 changes: 6 additions & 0 deletions src/test/scala/rxscalajs/ObservableTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ object ObservableTest extends TestSuite {
obs.combineLatest(intervalObs).subscribe(unit)
obs.combineLatestWith(intervalObs)((n, n2) => n + n2).subscribe(unit)
}
'CombineLatestMultiple {
obs.combineLatest(intervalObs, hoObs).subscribe(unit)
obs.combineLatestWith(intervalObs, hoObs)((n, n2, n3) => n + n2).subscribe(unit)
obs.combineLatest(intervalObs, hoObs, notiObs).subscribe(unit)
obs.combineLatestWith(intervalObs, hoObs, notiObs)((n, n2, n3, n4) => n + n2).subscribe(unit)
}
'Concat {
obs.concat(intervalObs).subscribe(unit)
}
Expand Down

0 comments on commit 18fe357

Please sign in to comment.