reimplementing reactive extensions (rx) (part 4: Generate and GenerateWithTime)

In part3 of my blog series about reimplementing reacitve extensions with SimpleRX i wrote about some basic generation methodes. In this post i will write about the

  • Generate<int>
  • GenerateWithTime<int>

generation methodes.

To explain the commands i will first show the use in a test case and then explain the implementation.

Generate<int>

I will start with the generate method that is able to generate items out of an initValue an condition func and iterate func and a resultSelector func. Here is the test and the result of the test to so see how Generate<int> works.

[TestMethod]
public void GenerateTest()
{
    IObservable<int> generateObject = Observable.Generate(1, i => i <= 10, i => i + 1, i => i);
    generateObject.Subscribe<int>(
        x => Trace.WriteLine(x),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Generate<int> builds an Observable<int> sequence. The first parameter (intValue) sets the startingpoint of the sequence, the second (condition) tells the sequence when to end. The third parameter (iterate) tells the sequence how to increment or decrement the value and the fourth parameter (resultSelector) defines the result value. Now lets look at the implementation of Generate<int>. The generate methode returns a GenerateSubject.

public static IObservable<int> Generate(int initValue, Predicate<int> condition, 
    Func<int, int> iterate, Func<int, int> resultSelector)
{
    return new GenerateSubject(initValue, condition, iterate, resultSelector);
}
public static IObservable<int> Generate(int initValue, Predicate<int> condition,
    Func<int, int> iterate, Func<int, int> resultSelector, IScheduler scheduler)
{
    return new GenerateSubject(initValue, condition, iterate, resultSelector, scheduler);
}
public class GenerateSubject : BaseSubject<int>
{
    private int initValue;
    private Predicate<int> condition;
    private Func<int, int> iterate;
    private Func<int, int> resultSelector;

    public GenerateSubject(int initValue, Predicate<int> condition, 
        Func<int, int> iterate, Func<int, int> resultSelector)
    {
        this.initValue = initValue;
        this.condition = condition;
        this.iterate = iterate;
        this.resultSelector = resultSelector;
        this.Scheduler = new CurrentThreadScheduler();
    }

    public GenerateSubject(int initValue, Predicate<int> condition, 
        Func<int, int> iterate, Func<int, int> resultSelector, IScheduler scheduler) :
        this(initValue, condition, iterate, resultSelector)
    {
        this.Scheduler = scheduler;
    }

    public class ThreadExecuter
    {
        public int Value { get; set; }
        public Predicate<int> Condition { get; set; }
        public Func<int, int> Iterate { get; set; }
        public Func<int, int> ResultSelector { get; set; }
        public List<IObserver<int>> Observers { get; set; }
        public IScheduler Scheduler { get; set; }

        public void Execute()
        {
            Action<int> action = null;
            action = (int value) =>
                {
                    // if no Observer are listening then the generation ends
                    if (Observers.Count <= 0)
                        return;
                    // here the Condition func is executed if it returns true then
                    // the next item is generated and the Observers are notified,
                    // if it returns false the Observers are notified about the completion.
                    if (Condition(value))
                    {
                        // here the ResultSelector func is executed and notifies
                        // the Observers with the selectorValue
                        var selectorValue = ResultSelector(value);
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnNext(value);
                        }
                        var iterateValue = Iterate(value);
                        // here we start the next iteration but we are not using a loop.
                        // We schedule the next iteration item with the Scheduler.
                        // So here we give another thead the chance to execute if we are using
                        // a thread or task scheduler
                        Scheduler.Schedule(action, iterateValue);
                    }
                    else
                    {
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnCompleted();
                        }
                    }
                };
            action(Value);
        }
    }

    public override void Execute()
    {
        // the ThreadExecutor is the class thats becomes scheduled by the Scheduler 
        // (it takes all the parameters and executes it on the thread
        //specified in the Scheduler variable) 
        ThreadExecuter threadExecuter =
            new ThreadExecuter()
            {
                Value = initValue,
                Observers = observers,
                Condition = condition,
                Iterate = iterate,
                ResultSelector = resultSelector,
                Scheduler = this.Scheduler
            };
        // Schedule the threadExecuter
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

GenerateWithTime<int>

The GenerateWithTime<int> method works very much like the Generate<int> method but it has an additional time component. That means it generates the values not immediatly. Is has an additional parameter timeselector. This parameter is a func and this func takes the value (i) and returns a TimeSpan. This returned timespan is the time when the next value is generated. In this example the func returns TimeSpan.FromSeconds(i). Because i is getting bigger every time (starts at 1 ends at 10) the the first item takes 1 second to generate, the second takes 2 seconds to generate and the 10 takes 10 seconds to generate. In the result set you can see the time when it was generated in seconds.

[TestMethod]
public void GenerateWithTimeTest()
{
    IObservable<int> generateObject =
        Observable.GenerateWithTime(1, i => i <= 10, i => i + 1, i => i, i => TimeSpan.FromSeconds(i));
    generateObject.Subscribe(
        x => Trace.WriteLine(string.Format("{0} / {1}", x, DateTime.Now.Second)),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
    Console.ReadLine();
}
public static IObservable<int> GenerateWithTime(int initValue, 
    Predicate<int> condition, Func<int, int> iterate, 
    Func<int, int> resultSelector, Func<int, TimeSpan> timeSelector)
{
    return new GenerateWithTimeSubject(initValue, condition, iterate, resultSelector, timeSelector);
}
public class GenerateWithTimeSubject : BaseSubject<int>
{
    private int initValue;
    private Predicate<int> condition;
    private Func<int, int> iterate;
    private Func<int, int> resultSelector;
    private Func<int, TimeSpan> timeSelector;
    private Timer timer;

    public GenerateWithTimeSubject(int initValue, Predicate<int> condition, Func<int, int> iterate,
        Func<int, int> resultSelector, Func<int, TimeSpan> timeSelector)
    {
        this.initValue = initValue;
        this.condition = condition;
        this.iterate = iterate;
        this.resultSelector = resultSelector;
        this.timeSelector = timeSelector;
    }

    public class ThreadExecuter
    {
        public int Value { get; set; }
        public Predicate<int> Condition { get; set; }
        public Func<int, int> Iterate { get; set; }
        public Func<int, int> ResultSelector { get; set; }
        public Func<int, TimeSpan> TimespanSelector { get; set; }
        public List<IObserver<int>> Observers { get; set; }
        private Timer timer;
        public IScheduler Scheduler { get; set; }

        private AutoResetEvent autoResetEvent = new AutoResetEvent(false);

        public void Execute()
        {
            Action<int> action = null;
            action = (int value) =>
            {
                //if no observers observe then the generation ends
                if (Observers.Count <= 0)
                    return;
                var tempInitValue = value;
                // the timespanSelector calculates the timespan to the generation of the next item
                var timeSpan = TimespanSelector(tempInitValue);
                // if the Condition is true the generation continues
                if (Condition(value))
                {
                    // here the timer starts with the calculated timeSpan
                    timer = new Timer(obj =>
                    {
                        int localInitValue = (int)obj;
                        var selectorValue = ResultSelector(localInitValue);
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnNext(selectorValue);
                        }
                        // if the all Observers are notified the autoResetEvent is executed and
                        // autoResetEvent.Set() is called
                        autoResetEvent.Set();
                        // every timer runs just one time (TimeSpan(-1))
                    }, tempInitValue, timeSpan, new TimeSpan(-1));
                    // here we wait until the generation of the previous element in the timer ends                        
                    autoResetEvent.WaitOne();
                    var iterateValue = Iterate(value);
                    // here we schedule the next item generation
                    Scheduler.Schedule(action, iterateValue);
                }
                else
                {
                    // if the condition is false all Observers are notifierd that the generation
                    // completed.
                    for (int i = 0; i < Observers.Count; i++)
                    {
                        Observers[i].OnCompleted();
                    }
                }
            };
            action(Value);
        }
    }

    //private AutoResetEvent autoResetEvent = new AutoResetEvent(false);

    public override void Execute()
    {
        ThreadExecuter threadExecuter =
            new ThreadExecuter()
            {
                Value = initValue,
                Observers = observers,
                Condition = condition,
                Iterate = iterate,
                ResultSelector = resultSelector,
                TimespanSelector = timeSelector,
                Scheduler = this.Scheduler
            };
        // here the first generation starts
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

In the next post i will explain some extension methodes for Observable that allow functional method chaning and transformation of Observable sequences.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)

Advertisements


If you have a note or a question please write a comment.

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s