Reactive Extensions

In my last few posts I’ve talked about learning F#, the .NET functional programming language. I like it, but I can’t use it in a production environment for a couple of reasons:
1. Part of the project is WPF and, as I mentioned in a previous post, the WPF support isn’t there.
2. It wouldn’t be maintainable. There wouldn’t be enough people in my organization prepared to learn F# in order to support it.

Rx

I eventually came across Reactive Extensions, also called Rx. A good learning resource is www.introtorx.com. Although new to me, it’s been around for a few years. It’s a library that implements an Observer/Subscriber pattern. Observables emit data and Observers consume it via Subscribe methods. It’s like an event model but adds:

  • Advanced event consumption, e.g.
    • Sampling (fetching data each time period)
    • Buffering (accumulating data and responding only after some time period)
    • Throttling (fetching data only after some quiet period has elapsed)
    • Filtering (consuming only events that match some pattern)
  • Threading, e.g. observing and subscribing on multiple threads or on specified threads

Supposedly some of the threading functionality has been superseded by Tasks and async/await, but I like that Rx is more distributed while Tasks are still procedural.

Reactive also adds some functional sequence methods like Scan and its event filtering is like the pattern matching found in functional conditionals. It’s also functional in the sense that your observer is getting input and (presumably) producing output, like a function, though unlike a proper function an observer can have state.

Creating Observables

Creating an observer is easy. IObserver has three methods, OnNext, OnCompleted, and OnError. Simply implement the interface and handle the data in OnNext. Creating an observable yourself is a bit more tricky.

Events

One way is to create an observable out of a plain old .NET event.
Let’s say you have some event:

    public event EventHandler<DateTime> MyEvent;

You can create an observable like this:

    IObservable<DateTime> eventObservable = Observable.FromEventPattern<EventHandler<DateTime>,DateTime>(
        handler => m.MyEvent += handler,
        handler => m.MyEvent -= handler
        ).Select( eventPattern => eventPattern.EventArgs );

Observable Collections

The ReactiveUI library (search for reactiveui in NuGet) adds a ReactiveList with observable events built in. This shows how you’d listen to when items are added:

    ReactiveList<String> list = new ReactiveList<string>();
    list.ItemsAdded.Subscribe( s => Console.WriteLine( "Added {0}", s ) );

Note: You have to subscribe to one of the list’s events like ItemsAdded or ItemsRemoved. You can subscribe to the list itself, but I don’t know what events it produces.

Note also: The NuGet package is out of date as of the time of writing this post. If you just install it from NuGet you’ll get the error:

Warning 1 Reference to type ‘Splat.IEnableLogger’ claims it is defined in ‘[…]\packages\Splat.1.0.0\lib\Net45\Splat.dll’, but it could not be found c:\Projects\ReactiveDemo\packages\reactiveui-core.6.5.0\lib\Net45\ReactiveUI.dll ReactiveDemo
To fix this you have to update its dependent library, Splat, to the latest version.

Create your own Observable

If you’re going all-in on Rx and want to create your own IObservable without wrapping some other technology, there’s a way.
introtorx.com advises against implementing IObservable yourself and advises using Observable.Create instead. Here’s an example of a base class that handles using Create to provide an IObservable.

    /// <summary>
    /// Adds <see cref="IObservable{T>"/> support to a class.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class ObservableBase<T> {

        #region IObservable

        /// <summary>
        /// The list of observers.
        /// </summary>
        private readonly List<IObserver<T>> _observers = new List<IObserver<T>>();

        /// <summary>
        /// Creates an observable using <see cref="Observable.Create{TResult}(Func{System.IObserver{TResult},System.Action})"/>.
        /// </summary>
        public IObservable<T> AsObservable {
            get {
                return Observable.Create( (Func<IObserver<T>, IDisposable>)Subscribe );
            }
        }

        /// <summary>
        /// Pushes value to all observers.
        /// </summary>
        /// <param name="value"></param>
        protected void Emit( T value ) {
            _observers.ForEach( observer => observer.OnNext( value ) );
        }

        /// <summary>
        /// Used to add observers.
        /// </summary>
        /// <param name="observer"></param>
        /// <returns></returns>
        public IDisposable Subscribe( IObserver<T> observer ) {
            _observers.Add( observer );
            return Disposable.Empty;
        }

        #endregion
    }

Note that it doesn’t implement IObservable directly but provides an AsObservable method that makes use of Observable.Create.

Chaining Observables

Given its relationship to functional programming, it seems natural to want to chain Observables together so that one Observer is the next class’ Observable. After all, you’ll often use the result of one function as input to the next.

At any rate, using code like the above, this is how you might create a class that is both Observer and Observable:

    /// <summary>
    /// An class that is both Observer and Observable.
    /// </summary>
    /// <typeparam name="TIn"></typeparam>
    /// <typeparam name="TOut"></typeparam>
    public abstract class ChainedObserver<TIn,TOut> : IObserver<TIn> {
        #region IObservable

        /// <summary>
        /// The lsit of observers.
        /// </summary>
        private readonly List<IObserver<TOut>> _observers = new List<IObserver<TOut>>();

        /// <summary>
        /// Creates an observable using <see cref="Observable.Create{TResult}(Func{System.IObserver{TResult},System.Action})"/>.
        /// </summary>
        public IObservable<TOut> AsObservable {
            get {
                return Observable.Create( (Func<IObserver<TOut>, IDisposable>)Subscribe );
            }
        }

        /// <summary>
        /// Pushes value to all observers.
        /// </summary>
        /// <param name="value"></param>
        protected void Emit( TOut value ) {
            _observers.ForEach( observer => observer.OnNext( value ) );
        }

        /// <summary>
        /// Used to add observers.
        /// </summary>
        /// <param name="observer"></param>
        /// <returns></returns>
        public IDisposable Subscribe( IObserver<TOut> observer ) {
            _observers.Add( observer );
            return Disposable.Empty;
        }

        #endregion

        #region IObserver

        /// <summary>
        /// <see cref="IObserver{T}.OnCompleted"/>
        /// </summary>
        virtual public void OnCompleted() {
        }

        /// <summary>
        /// <see cref="IObserver{T}.OnError"/>
        /// </summary>
        /// <param name="error"></param>
        virtual public void OnError( Exception error ) {
        }

        /// <summary>
        /// <see cref="IObserver{T}.OnNext"/>
        /// </summary>
        /// <param name="value"></param>
        virtual public void OnNext( TIn value ) {
        }

        #endregion
    }

As before, it doesn’t implement IObservable directly but provides an AsObservable method that makes use of Observable.Create.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s