Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksTuev committed Mar 4, 2017
1 parent 1653fc3 commit e74be29
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOperator;
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.ArrayCompositeDisposable;
import io.reactivex.internal.disposables.DisposableHelper;


/**
* This operator freezes Completable events (onError, onComplete) when freeze selector emits true,
* and unfreeze it after freeze selector emits false.
* If freeze selector does not emit any elements, all events would be frozen
*
* Completable after this operator can emit event in different threads
* You should pass this operator in method {@link io.reactivex.Completable#lift(CompletableOperator)}
* for apply it
*/
public class CompletableOperatorFreeze implements CompletableOperator {

private final Observable<Boolean> freezeSelector;
Expand All @@ -19,9 +30,7 @@ public CompletableOperatorFreeze(Observable<Boolean> freezeSelector) {

@Override
public CompletableObserver apply(CompletableObserver child) throws Exception {
return new FreezeObserver(
child,
freezeSelector);
return new FreezeObserver(child, freezeSelector);
}

private static final class FreezeObserver implements CompletableObserver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import io.reactivex.FlowableOperator;
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
Expand All @@ -18,6 +19,19 @@
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.subscribers.SerializedSubscriber;

/**
* This operator freezes Flowable events (onNext, onError, onComplete) when freeze selector emits true,
* and unfreeze it after freeze selector emits false.
* If freeze selector does not emit any elements, all events would be frozen
* If you want reduce num of elements in freeze buffer, you can define replaceFrozenEventPredicate.
* When Observable frozen and source observable emits normal (onNext) event, before it is added to
* the end of buffer, it compare with all already buffered events using replaceFrozenEventPredicate,
* and if replaceFrozenEventPredicate return true, buffered element would be removed.
*
* Flowable after this operator can emit event in different threads
* You should pass this operator in method {@link io.reactivex.Flowable#lift(FlowableOperator)}
* for apply it
*/
public class FlowableOperatorFreeze<T> implements FlowableOperator<T, T> {

private final Observable<Boolean> freezeSelector;
Expand Down
18 changes: 15 additions & 3 deletions ferro-rx/src/main/java/com/agna/ferro/rx/MaybeOperatorFreeze.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOperator;
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.ArrayCompositeDisposable;
import io.reactivex.internal.disposables.DisposableHelper;

/**
* This operator freezes Maybe events (onSuccess, onError, onComplete) when freeze selector emits true,
* and unfreeze it after freeze selector emits false.
* If freeze selector does not emit any elements, all events would be frozen
* If you want reduce num of elements in freeze buffer, you can define replaceFrozenEventPredicate.
* When Observable frozen and source observable emits normal (onNext) event, before it is added to
* the end of buffer, it compare with all already buffered events using replaceFrozenEventPredicate,
* and if replaceFrozenEventPredicate return true, buffered element would be removed.
*
* Maybe after this operator can emit event in different threads
* You should pass this operator in method {@link io.reactivex.Maybe#lift(MaybeOperator)}
* for apply it
*/
public class MaybeOperatorFreeze<T> implements MaybeOperator<T, T> {


Expand All @@ -20,9 +34,7 @@ public MaybeOperatorFreeze(Observable<Boolean> freezeSelector) {

@Override
public MaybeObserver<? super T> apply(MaybeObserver<? super T> child) throws Exception {
return new FreezeObserver<>(
child,
freezeSelector);
return new FreezeObserver<>(child, freezeSelector);
}

private static final class FreezeObserver<T> implements MaybeObserver<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,27 @@
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.SingleOperator;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.disposables.ArrayCompositeDisposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.SerializedObserver;

/**
* This operator freezes Observable events (onNext, onError, onComplete) when freeze selector emits true,
* and unfreeze it after freeze selector emits false.
* If freeze selector does not emit any elements, all events would be frozen
* If you want reduce num of elements in freeze buffer, you can define replaceFrozenEventPredicate.
* When Observable frozen and source observable emits normal (onNext) event, before it is added to
* the end of buffer, it compare with all already buffered events using replaceFrozenEventPredicate,
* and if replaceFrozenEventPredicate return true, buffered element would be removed.
*
* Observable after this operator can emit event in different threads
* You should pass this operator in method {@link io.reactivex.Observable#lift(ObservableOperator)}
* for apply it
*/
public class ObservableOperatorFreeze<T> implements ObservableOperator<T, T> {

private final Observable<Boolean> freezeSelector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
import io.reactivex.internal.disposables.ArrayCompositeDisposable;
import io.reactivex.internal.disposables.DisposableHelper;

/**
* This operator freezes Single events (onError, onSuccess) when freeze selector emits true,
* and unfreeze it after freeze selector emits false.
* If freeze selector does not emit any elements, all events would be frozen
*
* Single after this operator can emit event in different threads
* You should pass this operator in method {@link io.reactivex.Single#lift(SingleOperator)} for apply it
*/
public class SingleOperatorFreeze<T> implements SingleOperator<T, T> {

private final Observable<Boolean> freezeSelector;
Expand All @@ -19,9 +27,7 @@ public SingleOperatorFreeze(Observable<Boolean> freezeSelector) {

@Override
public SingleObserver<? super T> apply(SingleObserver<? super T> child) throws Exception {
return new FreezeObserver<>(
child,
freezeSelector);
return new FreezeObserver<>(child, freezeSelector);
}

private static final class FreezeObserver<T> implements SingleObserver<T> {
Expand Down

0 comments on commit e74be29

Please sign in to comment.