reimplementing reactive extensions (rx) (part 1: Introduction)

After exploring the reactice extensions (rx) and trying some examples from the curing the asynchronous blues with the reactive extensions for .net i thought about how i could implement rx myself to cover some special cases. Moreover i read the famous blog post from john skeet reimplementing linq to objects and so i stated to reimplement the reactive extensions and put the project to codeplex. (simpleRX) All the code examples i am writing about can be found in the simpleRX source code.

In this first post i will show an overview over the structure of my implementation. In the following posts i will show the implementation of the the special rx commads. I will start with some simple commands like Return, Range, Repeat, Never and Throw. Then i will write about some timebased commands like GenerateFromTime, Buffer, Delay, Timer, GenerateWithTime, Throttle and DistinctUntilChanged). After that i will describe the FromEvent and FromAsyncPattern commands. In my last post i will write explain some special commands like Amb and Zip.

Overview of the SimpleRX project;

The most important parts are the implementation are the IObserver (the Observerver<T> class) and the IObservable (the BaseSubject<T> class) interface. These two classes are the base of the Obervable pattern. The IObserver interface has three methods. The OnNext method is called if the next item is available. The OnError method is called if a error occured in the generation of an item and the OnComplete method is called if the production of items ended. On very imported thing when using rx is that it works push based. That means you register for the production of some values or items, and you become called if the value is avaliable. When you use IEnumerable and IEnumerator this works pull based, that means you call the iteration to get the next item. Here is the implementation of the Observer<T> class  and the BaseSubject<T> class. The Observer<T> class implements the three methods of the IObserver interface and has three overloaded constructors to compose the Observer.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace SimpleRX
{
    public class Observer<T> : IObserver<T>
    {
        private Action<T> OnNext_ = p => { };
        private Action OnCompleted_ = () => { };
        private Action<Exception> OnError_ = ex => { };

        public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
            this.OnCompleted_ = onCompleted;
        }

        public Observer(Action<T> onNext, Action<Exception> onError)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
        }

        public Observer(Action<T> onNext)
        {
            this.OnNext_ = onNext;
        }

        public void OnCompleted()
        {
            OnCompleted_();
        }

        public void OnError(Exception error)
        {
            OnError_(error);
        }

        public void OnNext(T value)
        {
            OnNext_(value);
        }
    }
}

The BaseSubject<T> class implements the IObservalbe<T> interface and is the entry point for the subscription. If a client wants to substrice to a source that produces values he overtakes a class that implements the IObserver interface and if a item is produced, an error was generated or the production has completed the client gets called through this inteface.  The Subject class is abstract because it produces no items, the subclassed of BaseSubject produce the items (for example the ReturnSubject or GenerateSubject class). The Scheduler property allows the subject to use different Schedulers (for example a TaskScheduler) to produce the items. The ColdSubscribe method is used if one Subject wants to Subcribe for another Subject withou starting the production of items by the first Subject. The DisposeObject<T> implements the IDispose interface and with the call of Dispose a Subscriber can stop his subscription.

public abstract class BaseSubject<T> : IObservable<T>
{
    public List<IObserver<T>> observers;
    private IScheduler scheduler = new TaskScheduler();
    public bool running;

    public IScheduler Scheduler
    {
        get { return scheduler; }
        set { this.scheduler = value; }
    }

    public abstract void Execute();

    protected virtual void NotifyObservers(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }     
    }        

    protected virtual void NotifyErrorObservers(Exception exception)
    {
        foreach (var item in observers)
        {
            item.OnError(exception);
        }
    }

    protected virtual void NotifyCompleteObservers()
    {
        foreach (var item in observers)
        {
            item.OnCompleted();
        }     
    }

    public BaseSubject()
    {
        observers = new List<IObserver<T>>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        if (!running)
        {
            this.running = true;
            Execute();
        }
        return disposable;
    }

    public IDisposable ColdSubscribe(IObserver<T> observer)
    {
        IDisposable disposalble = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        return disposalble;
    }
}

Another interesting class is the Subject class. A subject implements IObservable<T> and IObserver<T>, that means you can subscribe to a Subject and call the OnNext, OnError and OnComplete yourself. That means you can control the creation of items, if you call OnNext and Observers thas Subscribed to your Subject can react on this creation. This is the implementation of the Subject class. As you see it Implements IObservable<T> IObserver<T> and IDisposable and it is a child class of BaseSubject<T>.

public class Subject<T> : BaseSubject<T>, IObservable<T>, IObserver<T>, IDisposable
{
    private Action<T> OnNext_ = p => { };
    private Action OnCompleted_ = () => { };
    private Action<Exception> OnError_ = ex => { };

    public Subject(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
        this.OnCompleted_ = onCompleted;
    }

    public Subject(Action<T> onNext, Action<Exception> onError)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
    }

    public Subject(Action<T> onNext)
    {
        this.OnNext_ = onNext;            
    }

    public Subject()
    {

    }

    public void OnCompleted()
    {
        NotifyCompleteObservers();
    }

    public void OnError(Exception error)
    {
        NotifyErrorObservers(error);
    }

    public void OnNext(T value)
    {
        NotifyObservers(value);
    }       

    public void Dispose()
    {
        NotifyCompleteObservers();
    }

    public override void Execute()
    {

    }
}

Because the internal implementation for a subject is different the Subjects class is needed.  The Subjects class works like the Observable class but for subjects.  In my next post i will describe the Observer and the Subjects classes and there many different methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)

Advertisements

One Comment on “reimplementing reactive extensions (rx) (part 1: Introduction)”

  1. Thanks a lot for utilizing some time to publish
    “reimplementing reactive extensions (rx) (part 1: Introduction)
    netmatze”. Thanks a ton for a second time ,Stefanie


If you have a note or a question please write a comment.

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s