reimplementing reactive extensions (rx) (part 2: Observable, Subjects, ObservableExtensions and BaseSubject)

In part1 i started with the introduction of SimpleRX, in this second post  i will give an overview of the Observable, ObservableExtensions and the concret implementations of the abstract BaseSubject<T> class.  The Observable class contains all the calls for producing values. For example the Observable.Return<T>(T value) call returns the overtaken value to every observer. For every call at the Observable<T> class exists on Subject<T> object that produces the concrete values.

//http://msdn.microsoft.com/en-us/library/hh212048(v=vs.103).aspx
public static class Observable
{
    public static IObservable<T> Empty<T>()
    {
        return new EmptySubject<T>();
    }

    public static IObservable<T> Return<T>(T p)
    {
        return new ReturnSubject<T>(p);
    }

    public static IObservable<T> Return<T>(T[] p)
    {
        return new ReturnSubjectArray<T>(p);
    }
    ...

The ObservableExtensions class has some useful Extension methodes in it. With the ToObservable method it is possible to convert a single value an array or an IEnumerable to an IObservable. With the Subscribe methodes an Observer is created for an IObservable implementation, with overtakes the onNext, onError and onCompleted actions.

namespace CustomReactiveExtension
{
    public static class ObservableExtensions
    {
        public static IObservable<T> ToObservable<T>(this T[] array)
        {
            return new ReturnSubjectArray<T>(array);
        }

        public static IObservable<T> ToObservable<T>(this IEnumerable<T> collection)
        {
            return new ReturnSubjectCollection<T>(collection);
        }

        public static IObservable<T> ToObservable<T>(this T value)
        {
            return new ReturnSubject<T>(value);
        }

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            Observer<T> observer = new Observer<T>(onNext, onError, onCompleted);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext, Action<Exception> onError)
        {
            Observer<T> observer = new Observer<T>(onNext, onError);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }       

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext)
        {
            Observer<T> observer = new Observer<T>(onNext);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }
    }
}

For each method in the Observable class an subject object is generated. All these subjects are derived from BaseSubject<T>. The BaseSubject<T> class holds a list of Observers that are notified from the concrete implementation of BaseSubject<T>. To do so the concrete implementation uses the NotifyObservers, NotifyErrorObservers and NotifyCompleteObservers methodes. With the Subscribe and ColdSubscribe method Observers can register themselfes to  get notified if an Item is produced an error occured or the creation of items is completed. The difference between Subscribe and ColdSubscribe is that if Subscribe is called the subject starts creating items. As you see the abstract methode Execute is called in Subscribe. That means that the production of items starts here and the Observers become notified. In ColdSubcribe the Observer is added to the Observers collection, but the production of items does not start, because execute is not called.

public abstract class BaseSubject<T> : IObservable<T>
{
    public List<IObserver<T>> observers;
    private IScheduler scheduler = new TaskScheduler();
    public bool running;

    public IScheduler Scheduler
    {
        get { return scheduler; }
        set { this.scheduler = value; }
    }

    public abstract void Execute();

    protected virtual void NotifyObservers(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }     
    }        

    protected virtual void NotifyErrorObservers(Exception exception)
    {
        foreach (var item in observers)
        {
            item.OnError(exception);
        }
    }

    protected virtual void NotifyCompleteObservers()
    {
        foreach (var item in observers)
        {
            item.OnCompleted();
        }     
    }

    public BaseSubject()
    {
        observers = new List<IObserver<T>>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        if (!running)
        {
            this.running = true;
            Execute();
        }
        return disposable;
    }

    public IDisposable ColdSubscribe(IObserver<T> observer)
    {
        IDisposable disposalble = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        return disposalble;
    }
}

With this post the introduction of the SimpleRX structure ends. In the next posts i will show the use of SimpleRX and explain the different Observable methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)

Advertisements

reimplementing reactive extensions (rx) (part 1: Introduction)

After exploring the reactice extensions (rx) and trying some examples from the curing the asynchronous blues with the reactive extensions for .net i thought about how i could implement rx myself to cover some special cases. Moreover i read the famous blog post from john skeet reimplementing linq to objects and so i stated to reimplement the reactive extensions and put the project to codeplex. (simpleRX) All the code examples i am writing about can be found in the simpleRX source code.

In this first post i will show an overview over the structure of my implementation. In the following posts i will show the implementation of the the special rx commads. I will start with some simple commands like Return, Range, Repeat, Never and Throw. Then i will write about some timebased commands like GenerateFromTime, Buffer, Delay, Timer, GenerateWithTime, Throttle and DistinctUntilChanged). After that i will describe the FromEvent and FromAsyncPattern commands. In my last post i will write explain some special commands like Amb and Zip.

Overview of the SimpleRX project;

The most important parts are the implementation are the IObserver (the Observerver<T> class) and the IObservable (the BaseSubject<T> class) interface. These two classes are the base of the Obervable pattern. The IObserver interface has three methods. The OnNext method is called if the next item is available. The OnError method is called if a error occured in the generation of an item and the OnComplete method is called if the production of items ended. On very imported thing when using rx is that it works push based. That means you register for the production of some values or items, and you become called if the value is avaliable. When you use IEnumerable and IEnumerator this works pull based, that means you call the iteration to get the next item. Here is the implementation of the Observer<T> class  and the BaseSubject<T> class. The Observer<T> class implements the three methods of the IObserver interface and has three overloaded constructors to compose the Observer.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace SimpleRX
{
    public class Observer<T> : IObserver<T>
    {
        private Action<T> OnNext_ = p => { };
        private Action OnCompleted_ = () => { };
        private Action<Exception> OnError_ = ex => { };

        public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
            this.OnCompleted_ = onCompleted;
        }

        public Observer(Action<T> onNext, Action<Exception> onError)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
        }

        public Observer(Action<T> onNext)
        {
            this.OnNext_ = onNext;
        }

        public void OnCompleted()
        {
            OnCompleted_();
        }

        public void OnError(Exception error)
        {
            OnError_(error);
        }

        public void OnNext(T value)
        {
            OnNext_(value);
        }
    }
}

The BaseSubject<T> class implements the IObservalbe<T> interface and is the entry point for the subscription. If a client wants to substrice to a source that produces values he overtakes a class that implements the IObserver interface and if a item is produced, an error was generated or the production has completed the client gets called through this inteface.  The Subject class is abstract because it produces no items, the subclassed of BaseSubject produce the items (for example the ReturnSubject or GenerateSubject class). The Scheduler property allows the subject to use different Schedulers (for example a TaskScheduler) to produce the items. The ColdSubscribe method is used if one Subject wants to Subcribe for another Subject withou starting the production of items by the first Subject. The DisposeObject<T> implements the IDispose interface and with the call of Dispose a Subscriber can stop his subscription.

public abstract class BaseSubject<T> : IObservable<T>
{
    public List<IObserver<T>> observers;
    private IScheduler scheduler = new TaskScheduler();
    public bool running;

    public IScheduler Scheduler
    {
        get { return scheduler; }
        set { this.scheduler = value; }
    }

    public abstract void Execute();

    protected virtual void NotifyObservers(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }     
    }        

    protected virtual void NotifyErrorObservers(Exception exception)
    {
        foreach (var item in observers)
        {
            item.OnError(exception);
        }
    }

    protected virtual void NotifyCompleteObservers()
    {
        foreach (var item in observers)
        {
            item.OnCompleted();
        }     
    }

    public BaseSubject()
    {
        observers = new List<IObserver<T>>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        if (!running)
        {
            this.running = true;
            Execute();
        }
        return disposable;
    }

    public IDisposable ColdSubscribe(IObserver<T> observer)
    {
        IDisposable disposalble = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        return disposalble;
    }
}

Another interesting class is the Subject class. A subject implements IObservable<T> and IObserver<T>, that means you can subscribe to a Subject and call the OnNext, OnError and OnComplete yourself. That means you can control the creation of items, if you call OnNext and Observers thas Subscribed to your Subject can react on this creation. This is the implementation of the Subject class. As you see it Implements IObservable<T> IObserver<T> and IDisposable and it is a child class of BaseSubject<T>.

public class Subject<T> : BaseSubject<T>, IObservable<T>, IObserver<T>, IDisposable
{
    private Action<T> OnNext_ = p => { };
    private Action OnCompleted_ = () => { };
    private Action<Exception> OnError_ = ex => { };

    public Subject(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
        this.OnCompleted_ = onCompleted;
    }

    public Subject(Action<T> onNext, Action<Exception> onError)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
    }

    public Subject(Action<T> onNext)
    {
        this.OnNext_ = onNext;            
    }

    public Subject()
    {

    }

    public void OnCompleted()
    {
        NotifyCompleteObservers();
    }

    public void OnError(Exception error)
    {
        NotifyErrorObservers(error);
    }

    public void OnNext(T value)
    {
        NotifyObservers(value);
    }       

    public void Dispose()
    {
        NotifyCompleteObservers();
    }

    public override void Execute()
    {

    }
}

Because the internal implementation for a subject is different the Subjects class is needed.  The Subjects class works like the Observable class but for subjects.  In my next post i will describe the Observer and the Subjects classes and there many different methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)


async in the .net framework 4.5

In the developer preview of the .net framework 4.5 the new async feature is included. Microsoft indroduced two new keywords to make asynchronos
programming much easier. This two new Keywords are:

  • async
  • await

It is only possible to use await in a method that is marked with the async keyword. In this simple example the usage of async and await is shown.

 public async void SimpleAsync()
 {
      WebClient webClient = new WebClient();
      await webClient.DownloadStringTaskAsync("http://www.orf.at");
 }

This example shows how the async pattern works. First you have to declare the method as async. If a method is marked as async it is possible to use the await keyword  in it.
If the execution path hits the await keyword it is executing the asynchron operation and imediatly returns to the calling method. If the asynchron operation finished the rest of the method is executed. That means that the compiler is generating a ContinueWith Task for you and puts all statements behind the await keyword in this ContinueWith Task. To do this the compiler generates a state maschine. This statemaschine works similar to the statemaschine the compiler generates if you use the yield statement. The following code comes out of ilspy and shows the code the compiler generates.

public void SimpleAsync()
{
    Program.d__0 d__0 = new Program.d__0(0);
    d__0.4__this = this; 
    d__0.t__MoveNextDelegate = new Action(d__0.MoveNext);
    d__0.$builder = AsyncVoidMethodBuilder.Create();
    d__0.MoveNext();
}

As we see the compiler generates an special class to handle the async execution.

private sealed class d__0
{
    private int 1__state;
    private bool $__disposing;
    public AsyncVoidMethodBuilder $builder;
    public Action t__MoveNextDelegate;
    public Program 4__this;
    private TaskAwaiter<string> t__$await2;
    [DebuggerHidden]
    public d__0(int 1__state)
    {
        this.1__state = 1__state;
    }
    public void MoveNext()
    {
        AsyncVoidMethodBuilder asyncVoidMethodBuilder;
        try
        {
            int num = this.1__state;
            if (num != 1)
            {
                if (this.1__state == -1)
                {
                    return;
                }
                this.t__$await2 = AsyncCtpThreadingExtensions.GetAwaiter<string>(AsyncCtpExtensions.DownloadStringTaskAsync(new WebClient(), new Uri("http://www.weather.gov")));
                if (!this.t__$await2.IsCompleted)
                {
                    this.1__state = 1;
                    this.t__$await2.OnCompleted(this.t__MoveNextDelegate);
                    return;
                }
            }
            else
            {
                this.1__state = 0;
            }
            string result = this.t__$await2.GetResult();
            this.t__$await2 = default(TaskAwaiter<string>);
            this.4__this.WriteLinePageTitle(result);
        }
        catch (Exception arg_AB_0)
        {
            Exception exception = arg_AB_0;
            this.1__state = -1;
            asyncVoidMethodBuilder = this.$builder;
            asyncVoidMethodBuilder.SetException(exception);
            return;
        }
        this.1__state = -1;
        asyncVoidMethodBuilder = this.$builder;
        asyncVoidMethodBuilder.SetResult();
    }
}

So the compiler generates all the importent asynchron code. To execute an asynchron operation you only need to declare an method as async and then you have to tell the compiler on wich operation you want to wait. Befor this operation you put the await keyword and the compiler does the rest.


Programming with the TPL (task parallel library)

This is the presentation and the code examples i used for my talk about the TPL, i did for the .net usergroup south austria.

presentation

code examples