Skip to content

Day 07 Rx.Net part 1

Kobi Hari edited this page Nov 4, 2020 · 1 revision

Day 07 - Reactive X - Part 1

Projects:

Introduction to Rx Introduction to Reactive X Observables Observers and Subjects
Practical Rx.Net A WPF Application shoing how to create service layer using reactive

Theory of Reactive Programing

  • We talked about Enumerable, Task and how Observable is a fusion of their ideas
  • We defined the Observable as an asynchronous object that pushes data (Like Task), but it pushes a stream of data (Like IEnumerable)
  • We defined a Stream as a timeline with that pushes events of type T.
    • Each event is called OnNext(T)
    • A stream may finish successfuly, which yields the event OnCompleted()
    • A stream may finish with an error, which yields the event OnError(Exception)
    • Streams may also be infinite.
    • Once either OnComplete or OnError are called, there will be no more events on that observer.
  • We defined the role of the Observer and Observable
    • Observer is any class that implements the three methods, so streams can push events onto it.
    • Observable is a class that implements a single method: Subscribe(Observer) which causes it to start sending events to the observer

Creating Observers

  • IObserver is an interface that you may inherit from. But this is, of course, awkward. To define a dedicated class for each situation where you want to respond to an observable
  • An easier possibility is to use Observer.Create in order to compose the 3 delegates into an object.
  • As we will see later on, even this possiblity is rarely used, as there are more convinient ways to create observers.

Creating Observables

  • As we have seen, observable is defined by the logic that it runs when the observer subscribes. An observer is simply a wrapper around a method that takes an observer and returns an IDisposable, and this method is called whenever an observer subscribes.
  • IObservable is an interface, so obviously you can implement it to create your own observable.
  • We have seen that this option is, too, rarely used and quite awkward.
  • We can instead use one of many Creator operators to construct an observable with specific behavior
    • Observable.Create: Creates an observable from a delegate that takes an observer
    • Observable.Return: Creates an observable that yields one value and then completes (a little like Task.FromResult)
    • Observable.Throw: Creates an observable that yields an error (a little like Task.FromException)
    • Observable.Never: Creates an infinite Observable that does not yield any event
    • Observable.Interval: Creates an observable that yields an increasing value every constant period
    • Observable.Timer: Creates an observable that returns 0 and completes after an amount of time
    • Observable.Range: Creates an Observable that returns a sequence of events between 2 numbers and then completes
    • Observable.Generate: Creates an observable that acts like a for loop, taking an initial value, an iteration method, a termination predicate, and a projection method. The observable yields all the values synchronously and then completes.
  • We have seen that we can create observables from other objects:
    • IEnumerable.ToObservable: Creates an observable that returns a collection of items synchronously and then completes
    • Observable.FromEventPattern: Creates an observable from an event, by subscribing the observers to the event.
    • Task.ToObservable: Creates a hot observable from a task.

Hot Vs Cold Observables

  • We have seen that an Observable is defined by the logic it runs when the observer subscribes.
  • We have inspected that observers that subscribed in different times, sometimes get a different sequence of events.
  • We derived that since the logic of the observable is encapsulated in the delegate that runs upon subscription, if there is no subscription, an observable may not do anything.
  • We defined that observables that only do something on subscriptions are called Cold Observables
  • We defined that observables that have logic that runs regardless of subscriptions are called Hot Observables

Subjects

  • We have seen that most observables created by the creation operators are Cold
  • Which means that they raise different events for different observers.
  • Sometimes we just need the observables to act as a normal event and probide the same notifications to all observers
  • Subject is a class that implements both IObserver and IObservable. They act as a hub between observers and another observable.
  • When observers subscribe to a subject, it stores them in a list
  • When OnNext, OnCompleted, and OnError is called on a subject, it calls the exact same method on all its subscribers - thus acting like a hub.

Behavior and Replay Subjects

  • There are subclasses of Subject that have slightly different behavior.
  • A BehaviorSubject remembers the last OnNext called on it and when a new observer subscribes, the subject sends it the last value instantly.
  • A ReplaySubject stores not just the last value, but the entire collection of values, and when a new observer subscribes it replays all the previous values.
  • We understood that BehaviorSubject is best suited to store a mutating value and report its changes, since observers get future modifications but they also get the current value.
  • We understood that ReplaySubject is best limited to a certain window of time, or buffer size, to avoid memory leaks.

Reactive Stateful Services

  • We have used BehaviorSubject to implement a service that holds the state of a counter.
  • We used AsObservable to create a proxy over the BehaviorSubject so that consumers can only subscribe to it, but not call OnNext
  • We exposed the data of the service only using reactive, so any consumer must get the original value as well as future notifications

Marble Diagram