reimplementing reactive extensions (rx) (part 2: Observable, Subjects, ObservableExtensions and BaseSubject)

In part1 i started with the introduction of SimpleRX, in this second post  i will give an overview of the Observable, ObservableExtensions and the concret implementations of the abstract BaseSubject<T> class.  The Observable class contains all the calls for producing values. For example the Observable.Return<T>(T value) call returns the overtaken value to every observer. For every call at the Observable<T> class exists on Subject<T> object that produces the concrete values.

//http://msdn.microsoft.com/en-us/library/hh212048(v=vs.103).aspx
public static class Observable
{
    public static IObservable<T> Empty<T>()
    {
        return new EmptySubject<T>();
    }

    public static IObservable<T> Return<T>(T p)
    {
        return new ReturnSubject<T>(p);
    }

    public static IObservable<T> Return<T>(T[] p)
    {
        return new ReturnSubjectArray<T>(p);
    }
    ...

The ObservableExtensions class has some useful Extension methodes in it. With the ToObservable method it is possible to convert a single value an array or an IEnumerable to an IObservable. With the Subscribe methodes an Observer is created for an IObservable implementation, with overtakes the onNext, onError and onCompleted actions.

namespace CustomReactiveExtension
{
    public static class ObservableExtensions
    {
        public static IObservable<T> ToObservable<T>(this T[] array)
        {
            return new ReturnSubjectArray<T>(array);
        }

        public static IObservable<T> ToObservable<T>(this IEnumerable<T> collection)
        {
            return new ReturnSubjectCollection<T>(collection);
        }

        public static IObservable<T> ToObservable<T>(this T value)
        {
            return new ReturnSubject<T>(value);
        }

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            Observer<T> observer = new Observer<T>(onNext, onError, onCompleted);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext, Action<Exception> onError)
        {
            Observer<T> observer = new Observer<T>(onNext, onError);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }       

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext)
        {
            Observer<T> observer = new Observer<T>(onNext);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }
    }
}

For each method in the Observable class an subject object is generated. All these subjects are derived from BaseSubject<T>. The BaseSubject<T> class holds a list of Observers that are notified from the concrete implementation of BaseSubject<T>. To do so the concrete implementation uses the NotifyObservers, NotifyErrorObservers and NotifyCompleteObservers methodes. With the Subscribe and ColdSubscribe method Observers can register themselfes to  get notified if an Item is produced an error occured or the creation of items is completed. The difference between Subscribe and ColdSubscribe is that if Subscribe is called the subject starts creating items. As you see the abstract methode Execute is called in Subscribe. That means that the production of items starts here and the Observers become notified. In ColdSubcribe the Observer is added to the Observers collection, but the production of items does not start, because execute is not called.

public abstract class BaseSubject<T> : IObservable<T>
{
    public List<IObserver<T>> observers;
    private IScheduler scheduler = new TaskScheduler();
    public bool running;

    public IScheduler Scheduler
    {
        get { return scheduler; }
        set { this.scheduler = value; }
    }

    public abstract void Execute();

    protected virtual void NotifyObservers(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }     
    }        

    protected virtual void NotifyErrorObservers(Exception exception)
    {
        foreach (var item in observers)
        {
            item.OnError(exception);
        }
    }

    protected virtual void NotifyCompleteObservers()
    {
        foreach (var item in observers)
        {
            item.OnCompleted();
        }     
    }

    public BaseSubject()
    {
        observers = new List<IObserver<T>>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        if (!running)
        {
            this.running = true;
            Execute();
        }
        return disposable;
    }

    public IDisposable ColdSubscribe(IObserver<T> observer)
    {
        IDisposable disposalble = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        return disposalble;
    }
}

With this post the introduction of the SimpleRX structure ends. In the next posts i will show the use of SimpleRX and explain the different Observable methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)

Advertisements


If you have a note or a question please write a comment.

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s