reimplementing reactive extensions (rx) (part 3: return, repeat, range and throw)

part1 and part2 of my blog series about reimplementing reactive extensions with SimpleRX focused at the basic parts of the implementation of SimpleRX. Now i will show the use of SimpleRX and explain the different methodes to generate Observable items. When generating this items the starting point is always the Observable class. Initially i will start with the simple generation methodes.

  • Return<T>
  • Range<int>
  • Repeat<T>
  • Throw<T>

To explain the return command i will first show the use in a test case and then explain the implementation.

Return<T>

[TestMethod]
public void ReturnTest()
{
    IObservable<int> returnObject = Observable.Return(10);
    returnObject.Subscribe<int>(
        x => Assert.AreEqual(x, 10),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

This is a first very simple example of the use of SimpleRX. Here the Observable class has a static methode Return. Return generates an IObservable sequence out of an single value (in this case the integer value 10). Here is the implementation of the Return method.

public static IObservable<T> Return<T>(T p)
{
    return new ReturnSubject<T>(p);
}

As we see the Return methode returns a ReturnSubject object that is derived from BaseSubject.

public class ReturnSubject<T> : BaseSubject<T>
{
    private T value;

    public ReturnSubject(T value)
    {
        this.value = value;
    }

    public override void Execute()
    {
        NotifyObservers(value);
        NotifyCompleteObservers();            
    }
}

The ReturnSubjects implementation is very easy. ReturnSubject takes the generic value and if the Execute methode is executed the Observers get notified with the value (NotifyObserver(value)) and after that the Observers get notified that the notification completed (NotifyCompleteObservers()).

Range<int>

[TestMethod]
public void RangeTest()
{
    IObservable<int> rangeObject = Observable.Range(10,10);
    rangeObject.Subscribe<int>(
        x => Assert.AreEqual(x, 10),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Range generates an IObservable sequence out of an value that is repeated n times. Here are the two implementation of the Range method. The second implementation takes an IScheduler. With this Scheduler the generation of the values can take place in another thread. About the implementation of Schedulers i will write in part5 of this series.

public static IObservable<int> Range(int p, int xtime)
{
    return new RangeSubject(p, xtime);
}
public static IObservable<int> Range(int p, int xtime, IScheduler scheduler)
{
    return new RangeSubject(p, xtime, scheduler);
}

The Range methode returns a RangeSubject object that is derived from BaseSubject.

public class RangeSubject : BaseSubject<int>
{
    private int value;
    private int xtime;

    public RangeSubject(int value, int xtime)
    {
        this.value = value;
        this.xtime = xtime;
    }

    public RangeSubject(int value, int xtime, IScheduler scheduler)
    {
        this.value = value;
        this.xtime = xtime;
        this.Scheduler = scheduler;
    }

    public class ThreadExecuter
    {
        public int Value { get; set; }
        public int XTime { get; set; }
        public List<IObserver<int>> Observers { get; set; }
        public IScheduler Scheduler { get; set; }
        private int repeatCounter;

        public void Execute()
        {
            Action<int> action = null;
            action = (int value) =>
            {
                if (Observers.Count <= 0)
                    return;
                for (int i = 0; i < Observers.Count; i++)
                {
                    Observers[i].OnNext(value);
                }
                if (repeatCounter < XTime - 1)
                {
                    repeatCounter++;                        
                    Scheduler.Schedule(action, ++value);
                }
                else
                {
                    for (int i = 0; i < Observers.Count; i++)
                    {
                        Observers[i].OnCompleted();
                    }
                }
                return;
            };
            action(Value);
        }
    }

    public override void Execute()
    {
        ThreadExecuter threadExecuter = 
            new ThreadExecuter() { 
                Value = value, 
                Observers = observers,
                XTime = xtime, 
                Scheduler = this.Scheduler};
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

The implementation of Repeat is a little bit more complicated than the Return implementation. We could implement the repeat with a simple loop and notify the Observers. The problem is that if we want that the items are generated on another thread (or task) we need the Scheduler.Schedule call because then the thead gives back the control and the calling thread can do some work.

Repeat<T>

[TestMethod]
public void RepeatTest()
{
    IObservable<int> repeatObject = Observable.Repeat(10);
    repeatObject.Subscribe<int>(
        x => Assert.AreEqual(x, 10),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Repeat generates an IObservable sequence out of a value and generates this values until no Observers listen to it. Here are  the three implementation of the Repeat method. The first implementation starts when the first Observer subscribes and ends when the last subscriber disposes. The second works like the first but you have to overtake a scheduler. The third one ends after n times.

public static IObservable<T> Repeat<T>(T value)
{
    return new RepeatSubject<T>(value);
}
public static IObservable<T> Repeat<T>(T value, IScheduler scheduler)
{
    return new RepeatSubject<T>(value, scheduler);
}

public static IObservable<T> Repeat<T>(T value, int repeatCount)
{
    return new RepeatSubject<T>(value, repeatCount);
}

The Repeat methode returns a RepeatSubject object that is derived from BaseSubject.

public class RepeatSubject<T> : BaseSubject<T>
{
    private T value;
    private int repeatCount;
    private bool repeatEndless = true;

    public class ThreadExecuter<T>
    {
        public T Value { get; set; }
        public int RepeatCount { get; set; }
        public bool RepeatEndless { get; set; }
        public List<IObserver<T>> Observers { get; set; }
        public IScheduler Scheduler { get; set; }

        public void Execute()
        {
            Action action = null;
            action = () =>
            {
                if (Observers.Count <= 0)
                    return;
                for(int i = 0; i < Observers.Count; i++)
                {
                    Observers[i].OnNext(Value);
                }
                if (!RepeatEndless)
                {
                    if (RepeatCount > 0)
                    {
                        RepeatCount--;
                        //Console.WriteLine("Repeat Number {0} - {1}", RepeatCount, Value);
                        Scheduler.Schedule(action);
                    }
                    else
                    {
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnCompleted();
                        }
                    }
                }
                else
                {
                    Scheduler.Schedule(action);
                }
                return;
            };
            action();
        }
    }

    public RepeatSubject(T value)
    {
        this.value = value;
    }

    public RepeatSubject(T value, IScheduler scheduler) : this(value)
    {
        this.Scheduler = scheduler;
    }

    public RepeatSubject(T value, int repeatCount) : this(value)
    {            
        this.repeatCount = repeatCount;
        this.repeatEndless = false;
    }

    public RepeatSubject(T value, int repeatCount, IScheduler scheduler) : this(value, repeatCount)
    {
        this.Scheduler = scheduler;
    }

    public override void Execute()
    {
        ThreadExecuter<T> threadExecuter = 
            new ThreadExecuter<T>() { 
                Value = value, 
                Observers = observers,
                RepeatCount = repeatCount, 
                RepeatEndless = repeatEndless, 
                Scheduler = this.Scheduler};
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

The RepeatSubject implementation uses the Repeatendless variable. If this is true the Repeat runs endless, if Repeatendless is false it runs until RepeatCount is 0.

Throw<T>

[TestMethod]
public void ThrowTest()
{
    IObservable<int> throwObject = Observable.Throw<int>(new Exception("OnError"));
    throwObject.Subscribe<int>(
        x => { var y = x; },
        ex => Assert.AreEqual("OnError", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Throw is very simple it throws the Overtaken Exception to every Observer. Here is the implementation of the Throw method and the ThrowSubject class.

public static IObservable<T> Throw<T>(Exception exception)
{
    return new ThrowSubject<T>(exception);
}
public class ThrowSubject<T> : BaseSubject<T>
{
    private Exception exception;

    public ThrowSubject(Exception exception)
    {
        this.exception = exception;
    }

    public override void Execute()
    {
        NotifyErrorObservers(exception);   
    }
}

In the next post i will show some generation and time based Observable methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)

Advertisements


If you have a note or a question please write a comment.

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s