The new kid on the block: IObservable

In the Microsoft .NET Framework 4.0 a new interface is available, strongly focused on the publish/subscribe pattern (commonly found in the mechanism for push-based notifications). The idea is all about two new generic interface IObservable<T> and IObserver<T>. IObservable is responsible to provide all the functionality to the publisher, and IObserver to the subscriber. These are also know as provider and observer. Just keep in mind what each of them is supposed to do and you’ll be just fine. The contracts of both interfaces are declared as follows:

public interface IObserver<in T>
{
     void OnCompleted();
     void OnError(Exception error);
     void OnNext(T value);
}
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

[more]

Please, observe me!

The main target of the IObservable interface is to provide elements to the observers (subscribers). These observables (the publishers) own just one method, used to subscribe to the observer. Each time the observable pushes an object to the observer, it will be caught by the observer’s OnNext method. Let’s jump into an example with a basic class Cyclist:

class Cyclist
{
    private readonly string _name;
    private long _bestTime;
    private long _lastTime;
    private TimeSpan _totalTime;

    public Cyclist(string name)
    {
        _name = name;
        _bestTime = long.MaxValue;
        _totalTime = new TimeSpan();
    }

    public TimeSpan TotalTime
    {
        get
        {
            return _totalTime;
        }
    }

    // Method to update all our statistics
    public void RegisterRound(long roundtime)
    {
        // Saving the last round time
        _lastTime = roundtime;

        // Is the last round the best time?
        _bestTime = Math.Min(roundtime, _bestTime);

        // Total time
        _totalTime += new TimeSpan(roundtime);
    }

    public override string ToString()
    {
        return string.Format("{0} \n- TotalTime: {3}\n- LastTime: {1}\n- BestTime: {2}\n",
            this._name, new TimeSpan(this._lastTime), new TimeSpan(this._bestTime),
                                                                         this._totalTime);
    }
}

In addition to this class we’ll have a class that describes a race (I suppose this isn’t a very common scenario for cyclists to just ride laps, but it’ for a greater purpose).

class Race
{
    private int _totalRounds;
    private int _currentRound;
    private List<Cyclist> _cyclists;

    public Race(int totalRounds)
    {
        _cyclists = new List<Cyclist>();
        _totalRounds = totalRounds;
    }

    public int TotalRounds
    {
        get { return _totalRounds; }
    }

    public int CurrentRound
    {
        get { return _currentRound; }
        set { _currentRound = value; }
    }

    public List<Cyclist> Cyclists
    {
        get { return _cyclists; }
        set { _cyclists = value; }
    }

}

Of course, our goal is to monitor (or ‘observe’) this race. Let’s say our race is called SuperRace. This race is will need to contain a Race object. This Race object will need to contain a generic list of all participants. And what’s a race without some observing journalists? We’ll add two of those as well (named CNN and BBC), who will be the observable. Thus these journalists will become our SuperRace’s observers. Our SuperRace is defined as follows:

class SuperRace : IObservable<Race>
{
    List<IObserver<Race>> display = new List<IObserver<Race>>();
    Race race = new Race(40);

    public void Start()
    {
        race.Cyclists = new List<Cyclist>
        {
            new Cyclist("George Shields"),
            new Cyclist("Edward Begay"),
            new Cyclist("Craig Foy")
        };

        new Thread(() =>
        {
            Running();
        }).Start(); // 3.. 2.. 1.. GO!!! 

    }

    private void Running()
    {
        Random rnd = new Random();

        // Round loop
        for (int round = 0; round < race.TotalRounds; round++)
        {

            foreach (var c in race.Cyclists)
            {
                c.RegisterRound(rnd.Next(1000000000, int.MaxValue));
            }

            // Increasing num of rounds done
            race.CurrentRound++;

            // Oervers notification
            foreach (var d in display)
            {
                d.OnNext(race);
            }

            // Wait for a second
            Thread.Sleep(1000);
        }

        // Completed. observers notification
        foreach (var d in display)
        {
            d.OnCompleted();
        }

    }

    public IDisposable Subscribe(IObserver<Race> observer)
    {
        // New observer
        display.Add(observer);
        return new Disposable(() =>  display.Remove(observer));
    }

    public class Disposable : IDisposable
    {
        private Action _disposeAction;

        public Disposable(Action dispose)
        {
            _disposeAction = dispose;
        }

        public void Dispose()
        {
            _disposeAction();

        }
    }
}

What just create 3 cyclists who are going to participate in the race. At the end a new thread is created and the race starts. The ThreadStart delegate points to a method called Running() and this method will hold the racing timeline.  The total rounds are set by default at 40 so during 40 rounds all three cyclists will be gathering elapsed time and for each lap all the observers are going to be notified. RaceMonitor will be displaying race results in ascending order. Before the next lap, thread gets slept for a 1 second in order to give more emotion to simulated racing ;-)

Let me observe you

So we have the first part of our story (the IObservable) all finished up. At the other side of the IObservable we had the IObserver (you know, the journalists). We’ll describe these journalists as a ‘RaceMonitor’ since they are monitoring the race. The class is pretty straightforward:

class RaceMonitor : IObserver<Race>
{
    public void OnCompleted()
    {
        Console.WriteLine("FINISH!");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error.Message);
    }

    public void OnNext(Race value)
    {
        Console.Clear();
        Console.WriteLine("Race Round {0}", value.CurrentRound);

        var cyclists = from cyclist in value.Cyclists
                        orderby cyclist.TotalTime
                        select cyclist;

        foreach (var cyclist in cyclists)
        {
            Console.WriteLine(cyclist.ToString());
        }

    }
}

The observers will be represented by this RaceMonitor class and will subscribe to our SuperRace trough the Subscribe method in the main method:

class Program
{
    static void Main(string[] args)
    {
        SuperRace superRace = new SuperRace();

        // Our observers
        RaceMonitor cnn = new RaceMonitor();
        RaceMonitor bbc = new RaceMonitor();

        // Subscribe the observers to the race
        superRace.Subscribe(cnn);
        superRace.Subscribe(bbc);

        // Let race start
        superRace.Start();

        Console.Read();
    }
}

And of course we just run the application to try it out.. and Craig Foy wins the race!

image

Conclusion

As you can see IObservable and IObserver provide an easy way to created publisher/subscriber based programs. Stay tuned for a post about the duality between IObservable and IEnumerable. When dealing with these patterns one will most likely use Reactive Extensions (Rx), but more on that subject later.

Many thanks to Bart De Smet for providing some great insights about this topic.

18. August 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

Leave a Reply

Required fields are marked *

*