Skip to content

Commit

Permalink
Merge pull request #228 from mickverm/patch/blockingsubscribeby
Browse files Browse the repository at this point in the history
Added Maybe, Single, Completable blockingSubscribeBy
  • Loading branch information
vpriscan authored Mar 21, 2020
2 parents 98b4fa2 + f41ec48 commit 10c3966
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 1 deletion.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ Learn more about building this project with JitPack [here](https://jitpack.io/#R
|Completable|subscribeBy()|Disposable|Allows named arguments to construct a CompletableObserver|
|Observable<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking Observer|
|Flowable<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking Subscriber|
|Single<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking SingleObserver|
|Maybe<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking MaybeObserver|
|Completable<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking CompletableObserver|
|Disposable|addTo()|Disposable|Adds a `Disposable` to the specified `CompositeDisposable`|
|CompositeDisposable|plusAssign()|Disposable|Operator function to add a `Disposable` to this`CompositeDisposable`|

Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ val examplesImplementation by configurations.getting {
}

dependencies {
api("io.reactivex.rxjava3:rxjava:3.0.0-RC9")
api("io.reactivex.rxjava3:rxjava:3.0.0")
implementation(kotlin("stdlib"))

testImplementation("org.funktionale:funktionale-partials:1.0.0-final")
Expand Down
28 changes: 28 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin3/subscribers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,31 @@ fun <T : Any> Flowable<T>.blockingSubscribeBy(
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())

/**
* Overloaded blockingSubscribe function that allows passing named parameters
*/
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any> Maybe<T>.blockingSubscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onSuccess: (T) -> Unit = onNextStub
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())

/**
* Overloaded blockingSubscribe function that allows passing named parameters
*/
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any> Single<T>.blockingSubscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onSuccess: (T) -> Unit = onNextStub
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())

/**
* Overloaded blockingSubscribe function that allows passing named parameters
*/
@SchedulerSupport(SchedulerSupport.NONE)
fun Completable.blockingSubscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub
): Unit = blockingSubscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer())
10 changes: 10 additions & 0 deletions src/test/kotlin/io/reactivex/rxkotlin3/CompletableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ class CompletableTest : KotlinTests() {
.subscribeBy(onError = {}) as LambdaConsumerIntrospection
Assert.assertTrue(disposable.hasCustomOnError())
}

@Test
fun testBlockingSubscribeBy() {
Completable.complete()
.blockingSubscribeBy {
a.received(Unit)
}
Mockito.verify(a, Mockito.times(1))
.received(Unit)
}
}
1 change: 1 addition & 0 deletions src/test/kotlin/io/reactivex/rxkotlin3/FlowableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class FlowableTest {
}
Assert.assertTrue(first.get() == "Alpha")
}

@Test
fun testPairZip() {

Expand Down
11 changes: 11 additions & 0 deletions src/test/kotlin/io/reactivex/rxkotlin3/MaybeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ class MaybeTest {
Assert.assertTrue(disposable.hasCustomOnError())
}

@Test
fun testBlockingSubscribeBy() {
val first = AtomicReference<String>()

Maybe.just("Alpha")
.blockingSubscribeBy {
first.set(it)
}
Assert.assertTrue(first.get() == "Alpha")
}

@Test fun testConcatAll() {
(0 until 10)
.map { Maybe.just(it) }
Expand Down
10 changes: 10 additions & 0 deletions src/test/kotlin/io/reactivex/rxkotlin3/SingleTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ class SingleTest : KotlinTests() {
Assert.assertTrue(disposable.hasCustomOnError())
}

@Test
fun testBlockingSubscribeBy() {
Single.just("Alpha")
.blockingSubscribeBy {
a.received(it)
}
verify(a, Mockito.times(1))
.received("Alpha")
}

@Test fun testConcatAll() {
(0 until 10)
.map { Single.just(it) }
Expand Down

0 comments on commit 10c3966

Please sign in to comment.