Dogs Chasing Squirrels

A software development blog

Tag Archives: Functional programming

Reactive Extensions

0

In my last few posts I’ve talked about learning F#, the .NET functional programming language. I like it, but I can’t use it in a production environment for a couple of reasons:
1. Part of the project is WPF and, as I mentioned in a previous post, the WPF support isn’t there.
2. It wouldn’t be maintainable. There wouldn’t be enough people in my organization prepared to learn F# in order to support it.

Rx

I eventually came across Reactive Extensions, also called Rx. A good learning resource is www.introtorx.com. Although new to me, it’s been around for a few years. It’s a library that implements an Observer/Subscriber pattern. Observables emit data and Observers consume it via Subscribe methods. It’s like an event model but adds:

  • Advanced event consumption, e.g.
    • Sampling (fetching data each time period)
    • Buffering (accumulating data and responding only after some time period)
    • Throttling (fetching data only after some quiet period has elapsed)
    • Filtering (consuming only events that match some pattern)
  • Threading, e.g. observing and subscribing on multiple threads or on specified threads

Supposedly some of the threading functionality has been superseded by Tasks and async/await, but I like that Rx is more distributed while Tasks are still procedural.

Reactive also adds some functional sequence methods like Scan and its event filtering is like the pattern matching found in functional conditionals. It’s also functional in the sense that your observer is getting input and (presumably) producing output, like a function, though unlike a proper function an observer can have state.

Creating Observables

Creating an observer is easy. IObserver has three methods, OnNext, OnCompleted, and OnError. Simply implement the interface and handle the data in OnNext. Creating an observable yourself is a bit more tricky.

Events

One way is to create an observable out of a plain old .NET event.
Let’s say you have some event:

    public event EventHandler<DateTime> MyEvent;

You can create an observable like this:

    IObservable<DateTime> eventObservable = Observable.FromEventPattern<EventHandler<DateTime>,DateTime>(
        handler => m.MyEvent += handler,
        handler => m.MyEvent -= handler
        ).Select( eventPattern => eventPattern.EventArgs );

Observable Collections

The ReactiveUI library (search for reactiveui in NuGet) adds a ReactiveList with observable events built in. This shows how you’d listen to when items are added:

    ReactiveList<String> list = new ReactiveList<string>();
    list.ItemsAdded.Subscribe( s => Console.WriteLine( "Added {0}", s ) );

Note: You have to subscribe to one of the list’s events like ItemsAdded or ItemsRemoved. You can subscribe to the list itself, but I don’t know what events it produces.

Note also: The NuGet package is out of date as of the time of writing this post. If you just install it from NuGet you’ll get the error:

Warning 1 Reference to type ‘Splat.IEnableLogger’ claims it is defined in ‘[…]\packages\Splat.1.0.0\lib\Net45\Splat.dll’, but it could not be found c:\Projects\ReactiveDemo\packages\reactiveui-core.6.5.0\lib\Net45\ReactiveUI.dll ReactiveDemo
To fix this you have to update its dependent library, Splat, to the latest version.

Create your own Observable

If you’re going all-in on Rx and want to create your own IObservable without wrapping some other technology, there’s a way.
introtorx.com advises against implementing IObservable yourself and advises using Observable.Create instead. Here’s an example of a base class that handles using Create to provide an IObservable.

    /// <summary>
    /// Adds <see cref="IObservable{T>"/> support to a class.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class ObservableBase<T> {

        #region IObservable

        /// <summary>
        /// The list of observers.
        /// </summary>
        private readonly List<IObserver<T>> _observers = new List<IObserver<T>>();

        /// <summary>
        /// Creates an observable using <see cref="Observable.Create{TResult}(Func{System.IObserver{TResult},System.Action})"/>.
        /// </summary>
        public IObservable<T> AsObservable {
            get {
                return Observable.Create( (Func<IObserver<T>, IDisposable>)Subscribe );
            }
        }

        /// <summary>
        /// Pushes value to all observers.
        /// </summary>
        /// <param name="value"></param>
        protected void Emit( T value ) {
            _observers.ForEach( observer => observer.OnNext( value ) );
        }

        /// <summary>
        /// Used to add observers.
        /// </summary>
        /// <param name="observer"></param>
        /// <returns></returns>
        public IDisposable Subscribe( IObserver<T> observer ) {
            _observers.Add( observer );
            return Disposable.Empty;
        }

        #endregion
    }

Note that it doesn’t implement IObservable directly but provides an AsObservable method that makes use of Observable.Create.

Chaining Observables

Given its relationship to functional programming, it seems natural to want to chain Observables together so that one Observer is the next class’ Observable. After all, you’ll often use the result of one function as input to the next.

At any rate, using code like the above, this is how you might create a class that is both Observer and Observable:

    /// <summary>
    /// An class that is both Observer and Observable.
    /// </summary>
    /// <typeparam name="TIn"></typeparam>
    /// <typeparam name="TOut"></typeparam>
    public abstract class ChainedObserver<TIn,TOut> : IObserver<TIn> {
        #region IObservable

        /// <summary>
        /// The lsit of observers.
        /// </summary>
        private readonly List<IObserver<TOut>> _observers = new List<IObserver<TOut>>();

        /// <summary>
        /// Creates an observable using <see cref="Observable.Create{TResult}(Func{System.IObserver{TResult},System.Action})"/>.
        /// </summary>
        public IObservable<TOut> AsObservable {
            get {
                return Observable.Create( (Func<IObserver<TOut>, IDisposable>)Subscribe );
            }
        }

        /// <summary>
        /// Pushes value to all observers.
        /// </summary>
        /// <param name="value"></param>
        protected void Emit( TOut value ) {
            _observers.ForEach( observer => observer.OnNext( value ) );
        }

        /// <summary>
        /// Used to add observers.
        /// </summary>
        /// <param name="observer"></param>
        /// <returns></returns>
        public IDisposable Subscribe( IObserver<TOut> observer ) {
            _observers.Add( observer );
            return Disposable.Empty;
        }

        #endregion

        #region IObserver

        /// <summary>
        /// <see cref="IObserver{T}.OnCompleted"/>
        /// </summary>
        virtual public void OnCompleted() {
        }

        /// <summary>
        /// <see cref="IObserver{T}.OnError"/>
        /// </summary>
        /// <param name="error"></param>
        virtual public void OnError( Exception error ) {
        }

        /// <summary>
        /// <see cref="IObserver{T}.OnNext"/>
        /// </summary>
        /// <param name="value"></param>
        virtual public void OnNext( TIn value ) {
        }

        #endregion
    }

As before, it doesn’t implement IObservable directly but provides an AsObservable method that makes use of Observable.Create.

Functional programming with F#

0

In an episode of the .NET Rocks podcast, guest Bryan Hunter describes his experience analyzing a large object-oriented code base. It’s a good listen. The problems with it were the same problems you see with all large O-O code bases, he says. But one module was a complete surprise because…there were no problems. The application had zero downtime in 4 years of operation. The team was even able to do “hot loading” to deploy updates while the application was running. What was the difference? This module was written in Erlang, a functional language.

The life of a software engineer - cartoon

Life of a software engineer. I believe the original source is http://www.bonkersworld.net/building-software/

Whenever you build an enterprise-scale project, you will find yourself using a bare minimum of libraries and patterns such that despite your best intentions – or even because of your best intentions – your solution will be complex and tricky to maintain from moment one. It’s a terrible life we lead where following best practices feels bad and wrong. Functional programming may be the answer.

I’ve begun to learn .NET’s own functional programming language, F#. I wouldn’t call the language mainstream, but it’s not a toy, either. It’s completely compatible with the rest of the .NET class library so one can make WPF or ASP.NET MVC applications with it. I hope that it will one day lead to a more satisfying experience developing enterprise applications.

Some F# learning resources: