Calculating string difference values with the Levenshtein fuzzy string algorithm (part 1: introduction and implementation in c#).

In this post i will write about the Levenshtein algorithm. With this algorithm it is possible to compare two strings and calculate the a difference value of them. I read in a sql blog about this but the implementation was much to complicated so i desided to implement the algorithm myself in c# and in t-sql. After that i thought about the best use of the algorithm and decided to implement a special Levenshtein subject in my SimpleRX project to check a string against a list of overtaken strings. First of all i want to describe the Levenshtein algorithm that belongs to the fuzzy string search algorithms. The basic idea is very simple. You take two strings and compare that strings character by character. If both characters are the same you calculate zero if you have to exchange the character to another character to transform the source string into the target string you calculate two. If you have to insert or delete a character to transform the source character into the target character you calculate one. You do that for every character in the source string and as closer the source string is to the target string as smaller the number is. So as a example we take the string “must” and compare it to “dust”.

So the result is 2 because must and dust only differ in one character (exchange m for d). If we would calculate the difference between “mus” and “dust”

the result would be 3 because we have to exchange one character (d for m) and we have to add one character (t). If we have the string “mustard” and “dust”

the result is 5 because we have to exchange one character (d for m) and we have to delete three characters (a, r and d). So that are the basic operations to calculate the Levenshtein distance between two strings. The calculation in c# works with a two dimensional array to compare each character of the source string with each character of the target string plus one row and one column that shows the position of the characters in the source and target string. So for “must” and “dust” the start array is the following.

To accomplish this i used the following two loops.

var sourceLength = source.Length + 1;
var targetLength = target.Length + 1;
var array = new int
[sourceLength, targetLength];
for (int i = 0; i <= source.Length; i++)
{
     array[i, 0] = i;
}
for (int i = 0; i <= target.Length; i++)
{
     array[0, i] = i;
}

With that array we start the calculation. We use two loops to compare every character of the source string with every character of the target string. For every pair of characters we calculate the value for exchange, insert or delete and take the minimum to fill the corresponding item in the array. In this example we have two loops that run from 1 to the length of the array (for the source string) and from 1 to the length of the array (for the target string).
Now we check if the actual characters in the source and target string are the same. If yes we calculate zero if no we calcuate two. Then we canculate the insert value, the delete value and the interchange value out of the positiohn values already stored in the array. For i = 1 and j = 1 we compare the ‘m’ of must with the ‘d’ of dust. Equal is 2 because d and m are not the same. MinInsert is the value of [0,1] and that is a 1 plus 1 because of the insert that means that minInsert is 2. MinDelete is the value of [1,0] and that is a 1 plus 1 because of the delete that means that minDelete is 2. MinInterchange is the value of [0,0] and that is a 0 plus 2 because equal is 2 that means that minInterchange is 2. Now we take the minimum out of minInsert, minDelete and minInterchange. That means at the position [1,1] of the array we insert 2. That is the array after the comparision of the first two characters. Here we see the result of the first calcuation.

Now we want to look at the next continuous loop. Now i = 1 and j = 2 we compare ‘u’ with the ‘d’ of dust. Equal is 2 because u and d are not the same. MinInsert is the value of [0,2] and that is a 2 plus 1 because of the insert that means that minInsert is 3. MinDelete is the value of [1,1] and that is a 2 plus 1 because of the delete that means that minDelete is 3. MinInterchange is the value of [0,1] and that is a 1 plus 2 because of the equal is 2 that means that minInterchange is 3. Now we take the minimum out of minInsert, minDelete and minInterchange. That means at the position [1,2] of the array we insert 3. That is the array after the comparison of ‘u’ and ‘d’.

So we continue until the whole array is filled with values.

That is the code that generates the array values.

for (int i = 1; i <= source.Length; i++)
{
    for (int j = 1; j <= target.Length; j++)
    {
        var equal = (source[i - 1] == target[j - 1])? 0 : 2;
        var minInsert = array[i - 1, j] + 1;
        var minDelete = array[i, j - 1] + 1;
        var minInterchange = array[i - 1, j - 1] + equal;
        array[i, j] = Math.Min(Math.Min(minInsert, minDelete), minInterchange);                                        
    }
}

Now we take the value at position [n,m]  of the array. In our case it is array[4,4] that value is 2. That means the difference between ‘must’ and ‘dust’ is 2. Here we have the complete method that calculates the Levenshtein difference.

public int CalculateLevenshteinValue(string source, string target)
{
    var sourceLength = source.Length;
    var targetLength = target.Length;
    var array = new int[sourceLength + 1, targetLength + 1];
    for (int i = 0; i <= source.Length; i++)
    {
        array[i, 0] = i;
    }
    for (int i = 0; i <= target.Length; i++)
    {
        array[0, i] = i;
    }                       
    for (int i = 1; i <= source.Length; i++)
    {
        for (int j = 1; j <= target.Length; j++)
        {
            var equal = (source[i - 1] == target[j - 1])? 0 : 2;
            var minInsert = array[i - 1, j] + 1;
            var minDelete = array[i, j - 1] + 1;
            var minInterchange = array[i - 1, j - 1] + equal;
            array[i, j] = Math.Min(Math.Min(minInsert, minDelete), minInterchange);                                        
        }
    }
    return array[sourceLength, targetLength];
}

After finishing the implementation i wanted to use the calculation of the Levenshtein algorithm. So i build a LevenshteinSubject in my SimpleRX project to use it. Here is the code that shows the usage.

[TestMethod]
public void LevenshteinSubjectTest()
{
    List list = new List() { "label", "length", "lamp", "lab",
        "lacy", "lady", "lager", "lair", "lake", "lam", "lamb" };
    LevenshteinSubject subject = new LevenshteinSubject(list);
    subject.Subscribe(x => 
        { 
            var y = x;
            Debug.WriteLine(x);
        },
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted")
        );
    subject.OnNext("lay");
    subject.OnCompleted();
}

That is the result of the execution:

In the second part of that blog post i will show how to implement the Levenshtein algorithm in t-sql.

(part1) (part2)


reimplementing reactive extensions (rx) (part 5: Contains and Count)

In the last two posts (part3, part4) i wrote about generation functions that build Observabe<T> sequences. In this post i will write about some simple transformation methodes that take a IObservable sequence and transform it into another IObservable sequence. The transformation methodes i will explain are:

  • Contains
  • Count 

Contains<T>

The Contains method checks if an IObservable sequence contains a specific value. In this example i generate an Observable sequence with the Observable.Range method. That method generates a sequence from the startvalue (1) to the endvalue (4). At this sequence we use the contains extension method that should check if the given value (2) is in the sequence. It returns a IObservable<bool> that means every Observer is notified one time, if the sequence contains the value or not.

[TestMethod]
public void ContainsTest()
{
    IObservable<bool> returnObject = Observable.Range(1, 4).Contains(2);
    returnObject.Subscribe<bool>(
        x => Assert.IsTrue(x),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

The Contains method is the first Extension method in SimpleRX i write about. It checks if a given value is in the Observable sequence.

public static IObservable<bool> Contains<T>(this IObservable<T> source, T value)
{
    return new ContainsSubject<T>(source, value);
}

The ContainsSubject method takes the IObservable<T> source and the value to check. It casts the source to a base Subject that means that this extension method only works if the source is a BaseSubject. If you use another source that implements IObservable<T> but is not a BaseSubject it will not work. After that we create a new Observer<T> object. The Observer is described in part1 of this post series. So what we do is we overtake an OnNext, OnError and OnComplete Action to the Observer. With that Observer we call a ColdSubscribe of the overtaken subject. That means the ContainsSubject reacts on the OnNext, OnError and OnComplete of the overtaken source. To see the implementation of ColdSubscribe look at part1 of this post series. Now after subscribing the overtaken source we look at the methodes we have overtaken. The InnerExecute method takes the aktuell value that was produced by the source. It simply checks if the generated value of the source has the same value as the overtaken value. If it has the same value it set the boolean containsValue to true. That means the source contains the value. If the generation of the source ends (calling OnComplete) the ContainsSubject calls all Observers that subscribed to it an tells overtakes them the containsValue. After that it calls the NotifyCompleteObservers method to notify all Observers that the generation completed. The starting point that starts the generation of the source items it the overloaded Execute method. In this method the Execute method of the overtaken source is executed.

public class ContainsSubject<T> : BaseSubject<bool>
{
    private IObservable<T> source;        
    private IDisposable subscriped;
    private BaseSubject<T> subject;
    private bool containsValue;
    private T value;

    public ContainsSubject(IObservable<T> source, T value)
    {
        this.source = source;
        this.value = value;
        BaseSubject<T> subject = (BaseSubject<T>)source;
        this.subject = subject;
        var observer =
            new Observer<T>(
                localvalue => InnerExecute(localvalue),
                ex => NotifyErrorObservers(ex),
                () => InnerComplete());
        this.subscriped = subject.ColdSubscribe(observer);
    }

    private void InnerExecute(T value)
    {
        if (this.value.Equals(value))
        {
            containsValue = true;                
        }                        
    }

    private void InnerComplete()
    {
        NotifyObservers(containsValue);
        NotifyCompleteObservers();
    }

    public override void Execute()
    {
        try
        {
            subject.Execute();
        }
        catch (Exception exception)
        {
            NotifyErrorObservers(exception);
        }         
    }
}


Count<T>

The count method is like the Contains method an extension method that works on a given sequence. It simply counts the values in the sequence. In this example i generate a observable sequence with the Observable.Range method and then i count the items in the sequence. 

[TestMethod]
public void CountTest()
{
    IObservable<int> returnObject = Observable.Range(1, 4).Count();
    returnObject.Subscribe<int>(
        x => Assert.AreEqual<int>(x, 4),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

The implementation is simular to the Contains implementation but there are some differences. The count method returns a CountSubject.

public static IObservable<int> Count<T>(this IObservable<T> source)
{
    return new CountSubject<T>(source);
}

This CountSubject observes the overtaken observable sequnce by using the ColdSubscribe method. In the InnerExecute methode the items of the underlying sequence (source) counted. If the sequence completed the InnerComplete method is called and the NotifyObserver(counter) and NotifyCompleteObservers() is called to notify the Observers of the CountSubject sequence.

public class CountSubject<T> : BaseSubject<int>
{
    private IObservable<T> source;        
    private IDisposable subscriped;
    private BaseSubject<T> subject;
    private bool containsValue;
    private int counter;

    public CountSubject(IObservable<T> source)
    {
        this.source = source;
        BaseSubject<T> subject = (BaseSubject<T>)source;
        this.subject = subject;
        var observer =
            new Observer<T>(
                localvalue => InnerExecute(localvalue),
                ex => NotifyErrorObservers(ex),
                () => InnerComplete());
        this.subscriped = subject.ColdSubscribe(observer);
    }

    private void InnerExecute(T value)
    {
        counter++;        
    }

    private void InnerComplete()
    {
        NotifyObservers(counter);
        NotifyCompleteObservers();
    }

    public override void Execute()
    {
        try
        {
            subject.Execute();
        }
        catch (Exception exception)
        {
            NotifyErrorObservers(exception);
        }           
    }
}

There are a lot of transformation methodes in SimpleRX (First, Last, Skip, Take) but they work very simular to the Contains and Count methodes. So in the next part of this blog series i will write about the Where, Select and SelectMany methodes. 

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)


reimplementing reactive extensions (rx) (part 4: Generate and GenerateWithTime)

In part3 of my blog series about reimplementing reacitve extensions with SimpleRX i wrote about some basic generation methodes. In this post i will write about the

  • Generate<int>
  • GenerateWithTime<int>

generation methodes.

To explain the commands i will first show the use in a test case and then explain the implementation.

Generate<int>

I will start with the generate method that is able to generate items out of an initValue an condition func and iterate func and a resultSelector func. Here is the test and the result of the test to so see how Generate<int> works.

[TestMethod]
public void GenerateTest()
{
    IObservable<int> generateObject = Observable.Generate(1, i => i <= 10, i => i + 1, i => i);
    generateObject.Subscribe<int>(
        x => Trace.WriteLine(x),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Generate<int> builds an Observable<int> sequence. The first parameter (intValue) sets the startingpoint of the sequence, the second (condition) tells the sequence when to end. The third parameter (iterate) tells the sequence how to increment or decrement the value and the fourth parameter (resultSelector) defines the result value. Now lets look at the implementation of Generate<int>. The generate methode returns a GenerateSubject.

public static IObservable<int> Generate(int initValue, Predicate<int> condition, 
    Func<int, int> iterate, Func<int, int> resultSelector)
{
    return new GenerateSubject(initValue, condition, iterate, resultSelector);
}
public static IObservable<int> Generate(int initValue, Predicate<int> condition,
    Func<int, int> iterate, Func<int, int> resultSelector, IScheduler scheduler)
{
    return new GenerateSubject(initValue, condition, iterate, resultSelector, scheduler);
}
public class GenerateSubject : BaseSubject<int>
{
    private int initValue;
    private Predicate<int> condition;
    private Func<int, int> iterate;
    private Func<int, int> resultSelector;

    public GenerateSubject(int initValue, Predicate<int> condition, 
        Func<int, int> iterate, Func<int, int> resultSelector)
    {
        this.initValue = initValue;
        this.condition = condition;
        this.iterate = iterate;
        this.resultSelector = resultSelector;
        this.Scheduler = new CurrentThreadScheduler();
    }

    public GenerateSubject(int initValue, Predicate<int> condition, 
        Func<int, int> iterate, Func<int, int> resultSelector, IScheduler scheduler) :
        this(initValue, condition, iterate, resultSelector)
    {
        this.Scheduler = scheduler;
    }

    public class ThreadExecuter
    {
        public int Value { get; set; }
        public Predicate<int> Condition { get; set; }
        public Func<int, int> Iterate { get; set; }
        public Func<int, int> ResultSelector { get; set; }
        public List<IObserver<int>> Observers { get; set; }
        public IScheduler Scheduler { get; set; }

        public void Execute()
        {
            Action<int> action = null;
            action = (int value) =>
                {
                    // if no Observer are listening then the generation ends
                    if (Observers.Count <= 0)
                        return;
                    // here the Condition func is executed if it returns true then
                    // the next item is generated and the Observers are notified,
                    // if it returns false the Observers are notified about the completion.
                    if (Condition(value))
                    {
                        // here the ResultSelector func is executed and notifies
                        // the Observers with the selectorValue
                        var selectorValue = ResultSelector(value);
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnNext(value);
                        }
                        var iterateValue = Iterate(value);
                        // here we start the next iteration but we are not using a loop.
                        // We schedule the next iteration item with the Scheduler.
                        // So here we give another thead the chance to execute if we are using
                        // a thread or task scheduler
                        Scheduler.Schedule(action, iterateValue);
                    }
                    else
                    {
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnCompleted();
                        }
                    }
                };
            action(Value);
        }
    }

    public override void Execute()
    {
        // the ThreadExecutor is the class thats becomes scheduled by the Scheduler 
        // (it takes all the parameters and executes it on the thread
        //specified in the Scheduler variable) 
        ThreadExecuter threadExecuter =
            new ThreadExecuter()
            {
                Value = initValue,
                Observers = observers,
                Condition = condition,
                Iterate = iterate,
                ResultSelector = resultSelector,
                Scheduler = this.Scheduler
            };
        // Schedule the threadExecuter
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

GenerateWithTime<int>

The GenerateWithTime<int> method works very much like the Generate<int> method but it has an additional time component. That means it generates the values not immediatly. Is has an additional parameter timeselector. This parameter is a func and this func takes the value (i) and returns a TimeSpan. This returned timespan is the time when the next value is generated. In this example the func returns TimeSpan.FromSeconds(i). Because i is getting bigger every time (starts at 1 ends at 10) the the first item takes 1 second to generate, the second takes 2 seconds to generate and the 10 takes 10 seconds to generate. In the result set you can see the time when it was generated in seconds.

[TestMethod]
public void GenerateWithTimeTest()
{
    IObservable<int> generateObject =
        Observable.GenerateWithTime(1, i => i <= 10, i => i + 1, i => i, i => TimeSpan.FromSeconds(i));
    generateObject.Subscribe(
        x => Trace.WriteLine(string.Format("{0} / {1}", x, DateTime.Now.Second)),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
    Console.ReadLine();
}
public static IObservable<int> GenerateWithTime(int initValue, 
    Predicate<int> condition, Func<int, int> iterate, 
    Func<int, int> resultSelector, Func<int, TimeSpan> timeSelector)
{
    return new GenerateWithTimeSubject(initValue, condition, iterate, resultSelector, timeSelector);
}
public class GenerateWithTimeSubject : BaseSubject<int>
{
    private int initValue;
    private Predicate<int> condition;
    private Func<int, int> iterate;
    private Func<int, int> resultSelector;
    private Func<int, TimeSpan> timeSelector;
    private Timer timer;

    public GenerateWithTimeSubject(int initValue, Predicate<int> condition, Func<int, int> iterate,
        Func<int, int> resultSelector, Func<int, TimeSpan> timeSelector)
    {
        this.initValue = initValue;
        this.condition = condition;
        this.iterate = iterate;
        this.resultSelector = resultSelector;
        this.timeSelector = timeSelector;
    }

    public class ThreadExecuter
    {
        public int Value { get; set; }
        public Predicate<int> Condition { get; set; }
        public Func<int, int> Iterate { get; set; }
        public Func<int, int> ResultSelector { get; set; }
        public Func<int, TimeSpan> TimespanSelector { get; set; }
        public List<IObserver<int>> Observers { get; set; }
        private Timer timer;
        public IScheduler Scheduler { get; set; }

        private AutoResetEvent autoResetEvent = new AutoResetEvent(false);

        public void Execute()
        {
            Action<int> action = null;
            action = (int value) =>
            {
                //if no observers observe then the generation ends
                if (Observers.Count <= 0)
                    return;
                var tempInitValue = value;
                // the timespanSelector calculates the timespan to the generation of the next item
                var timeSpan = TimespanSelector(tempInitValue);
                // if the Condition is true the generation continues
                if (Condition(value))
                {
                    // here the timer starts with the calculated timeSpan
                    timer = new Timer(obj =>
                    {
                        int localInitValue = (int)obj;
                        var selectorValue = ResultSelector(localInitValue);
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnNext(selectorValue);
                        }
                        // if the all Observers are notified the autoResetEvent is executed and
                        // autoResetEvent.Set() is called
                        autoResetEvent.Set();
                        // every timer runs just one time (TimeSpan(-1))
                    }, tempInitValue, timeSpan, new TimeSpan(-1));
                    // here we wait until the generation of the previous element in the timer ends                        
                    autoResetEvent.WaitOne();
                    var iterateValue = Iterate(value);
                    // here we schedule the next item generation
                    Scheduler.Schedule(action, iterateValue);
                }
                else
                {
                    // if the condition is false all Observers are notifierd that the generation
                    // completed.
                    for (int i = 0; i < Observers.Count; i++)
                    {
                        Observers[i].OnCompleted();
                    }
                }
            };
            action(Value);
        }
    }

    //private AutoResetEvent autoResetEvent = new AutoResetEvent(false);

    public override void Execute()
    {
        ThreadExecuter threadExecuter =
            new ThreadExecuter()
            {
                Value = initValue,
                Observers = observers,
                Condition = condition,
                Iterate = iterate,
                ResultSelector = resultSelector,
                TimespanSelector = timeSelector,
                Scheduler = this.Scheduler
            };
        // here the first generation starts
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

In the next post i will explain some extension methodes for Observable that allow functional method chaning and transformation of Observable sequences.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)


reimplementing reactive extensions (rx) (part 3: return, repeat, range and throw)

part1 and part2 of my blog series about reimplementing reactive extensions with SimpleRX focused at the basic parts of the implementation of SimpleRX. Now i will show the use of SimpleRX and explain the different methodes to generate Observable items. When generating this items the starting point is always the Observable class. Initially i will start with the simple generation methodes.

  • Return<T>
  • Range<int>
  • Repeat<T>
  • Throw<T>

To explain the return command i will first show the use in a test case and then explain the implementation.

Return<T>

[TestMethod]
public void ReturnTest()
{
    IObservable<int> returnObject = Observable.Return(10);
    returnObject.Subscribe<int>(
        x => Assert.AreEqual(x, 10),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

This is a first very simple example of the use of SimpleRX. Here the Observable class has a static methode Return. Return generates an IObservable sequence out of an single value (in this case the integer value 10). Here is the implementation of the Return method.

public static IObservable<T> Return<T>(T p)
{
    return new ReturnSubject<T>(p);
}

As we see the Return methode returns a ReturnSubject object that is derived from BaseSubject.

public class ReturnSubject<T> : BaseSubject<T>
{
    private T value;

    public ReturnSubject(T value)
    {
        this.value = value;
    }

    public override void Execute()
    {
        NotifyObservers(value);
        NotifyCompleteObservers();            
    }
}

The ReturnSubjects implementation is very easy. ReturnSubject takes the generic value and if the Execute methode is executed the Observers get notified with the value (NotifyObserver(value)) and after that the Observers get notified that the notification completed (NotifyCompleteObservers()).

Range<int>

[TestMethod]
public void RangeTest()
{
    IObservable<int> rangeObject = Observable.Range(10,10);
    rangeObject.Subscribe<int>(
        x => Assert.AreEqual(x, 10),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Range generates an IObservable sequence out of an value that is repeated n times. Here are the two implementation of the Range method. The second implementation takes an IScheduler. With this Scheduler the generation of the values can take place in another thread. About the implementation of Schedulers i will write in part5 of this series.

public static IObservable<int> Range(int p, int xtime)
{
    return new RangeSubject(p, xtime);
}
public static IObservable<int> Range(int p, int xtime, IScheduler scheduler)
{
    return new RangeSubject(p, xtime, scheduler);
}

The Range methode returns a RangeSubject object that is derived from BaseSubject.

public class RangeSubject : BaseSubject<int>
{
    private int value;
    private int xtime;

    public RangeSubject(int value, int xtime)
    {
        this.value = value;
        this.xtime = xtime;
    }

    public RangeSubject(int value, int xtime, IScheduler scheduler)
    {
        this.value = value;
        this.xtime = xtime;
        this.Scheduler = scheduler;
    }

    public class ThreadExecuter
    {
        public int Value { get; set; }
        public int XTime { get; set; }
        public List<IObserver<int>> Observers { get; set; }
        public IScheduler Scheduler { get; set; }
        private int repeatCounter;

        public void Execute()
        {
            Action<int> action = null;
            action = (int value) =>
            {
                if (Observers.Count <= 0)
                    return;
                for (int i = 0; i < Observers.Count; i++)
                {
                    Observers[i].OnNext(value);
                }
                if (repeatCounter < XTime - 1)
                {
                    repeatCounter++;                        
                    Scheduler.Schedule(action, ++value);
                }
                else
                {
                    for (int i = 0; i < Observers.Count; i++)
                    {
                        Observers[i].OnCompleted();
                    }
                }
                return;
            };
            action(Value);
        }
    }

    public override void Execute()
    {
        ThreadExecuter threadExecuter = 
            new ThreadExecuter() { 
                Value = value, 
                Observers = observers,
                XTime = xtime, 
                Scheduler = this.Scheduler};
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

The implementation of Repeat is a little bit more complicated than the Return implementation. We could implement the repeat with a simple loop and notify the Observers. The problem is that if we want that the items are generated on another thread (or task) we need the Scheduler.Schedule call because then the thead gives back the control and the calling thread can do some work.

Repeat<T>

[TestMethod]
public void RepeatTest()
{
    IObservable<int> repeatObject = Observable.Repeat(10);
    repeatObject.Subscribe<int>(
        x => Assert.AreEqual(x, 10),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Repeat generates an IObservable sequence out of a value and generates this values until no Observers listen to it. Here are  the three implementation of the Repeat method. The first implementation starts when the first Observer subscribes and ends when the last subscriber disposes. The second works like the first but you have to overtake a scheduler. The third one ends after n times.

public static IObservable<T> Repeat<T>(T value)
{
    return new RepeatSubject<T>(value);
}
public static IObservable<T> Repeat<T>(T value, IScheduler scheduler)
{
    return new RepeatSubject<T>(value, scheduler);
}

public static IObservable<T> Repeat<T>(T value, int repeatCount)
{
    return new RepeatSubject<T>(value, repeatCount);
}

The Repeat methode returns a RepeatSubject object that is derived from BaseSubject.

public class RepeatSubject<T> : BaseSubject<T>
{
    private T value;
    private int repeatCount;
    private bool repeatEndless = true;

    public class ThreadExecuter<T>
    {
        public T Value { get; set; }
        public int RepeatCount { get; set; }
        public bool RepeatEndless { get; set; }
        public List<IObserver<T>> Observers { get; set; }
        public IScheduler Scheduler { get; set; }

        public void Execute()
        {
            Action action = null;
            action = () =>
            {
                if (Observers.Count <= 0)
                    return;
                for(int i = 0; i < Observers.Count; i++)
                {
                    Observers[i].OnNext(Value);
                }
                if (!RepeatEndless)
                {
                    if (RepeatCount > 0)
                    {
                        RepeatCount--;
                        //Console.WriteLine("Repeat Number {0} - {1}", RepeatCount, Value);
                        Scheduler.Schedule(action);
                    }
                    else
                    {
                        for (int i = 0; i < Observers.Count; i++)
                        {
                            Observers[i].OnCompleted();
                        }
                    }
                }
                else
                {
                    Scheduler.Schedule(action);
                }
                return;
            };
            action();
        }
    }

    public RepeatSubject(T value)
    {
        this.value = value;
    }

    public RepeatSubject(T value, IScheduler scheduler) : this(value)
    {
        this.Scheduler = scheduler;
    }

    public RepeatSubject(T value, int repeatCount) : this(value)
    {            
        this.repeatCount = repeatCount;
        this.repeatEndless = false;
    }

    public RepeatSubject(T value, int repeatCount, IScheduler scheduler) : this(value, repeatCount)
    {
        this.Scheduler = scheduler;
    }

    public override void Execute()
    {
        ThreadExecuter<T> threadExecuter = 
            new ThreadExecuter<T>() { 
                Value = value, 
                Observers = observers,
                RepeatCount = repeatCount, 
                RepeatEndless = repeatEndless, 
                Scheduler = this.Scheduler};
        Scheduler.Schedule(threadExecuter.Execute);
    }
}

The RepeatSubject implementation uses the Repeatendless variable. If this is true the Repeat runs endless, if Repeatendless is false it runs until RepeatCount is 0.

Throw<T>

[TestMethod]
public void ThrowTest()
{
    IObservable<int> throwObject = Observable.Throw<int>(new Exception("OnError"));
    throwObject.Subscribe<int>(
        x => { var y = x; },
        ex => Assert.AreEqual("OnError", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

Throw is very simple it throws the Overtaken Exception to every Observer. Here is the implementation of the Throw method and the ThrowSubject class.

public static IObservable<T> Throw<T>(Exception exception)
{
    return new ThrowSubject<T>(exception);
}
public class ThrowSubject<T> : BaseSubject<T>
{
    private Exception exception;

    public ThrowSubject(Exception exception)
    {
        this.exception = exception;
    }

    public override void Execute()
    {
        NotifyErrorObservers(exception);   
    }
}

In the next post i will show some generation and time based Observable methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)


reimplementing reactive extensions (rx) (part 2: Observable, Subjects, ObservableExtensions and BaseSubject)

In part1 i started with the introduction of SimpleRX, in this second post  i will give an overview of the Observable, ObservableExtensions and the concret implementations of the abstract BaseSubject<T> class.  The Observable class contains all the calls for producing values. For example the Observable.Return<T>(T value) call returns the overtaken value to every observer. For every call at the Observable<T> class exists on Subject<T> object that produces the concrete values.

//http://msdn.microsoft.com/en-us/library/hh212048(v=vs.103).aspx
public static class Observable
{
    public static IObservable<T> Empty<T>()
    {
        return new EmptySubject<T>();
    }

    public static IObservable<T> Return<T>(T p)
    {
        return new ReturnSubject<T>(p);
    }

    public static IObservable<T> Return<T>(T[] p)
    {
        return new ReturnSubjectArray<T>(p);
    }
    ...

The ObservableExtensions class has some useful Extension methodes in it. With the ToObservable method it is possible to convert a single value an array or an IEnumerable to an IObservable. With the Subscribe methodes an Observer is created for an IObservable implementation, with overtakes the onNext, onError and onCompleted actions.

namespace CustomReactiveExtension
{
    public static class ObservableExtensions
    {
        public static IObservable<T> ToObservable<T>(this T[] array)
        {
            return new ReturnSubjectArray<T>(array);
        }

        public static IObservable<T> ToObservable<T>(this IEnumerable<T> collection)
        {
            return new ReturnSubjectCollection<T>(collection);
        }

        public static IObservable<T> ToObservable<T>(this T value)
        {
            return new ReturnSubject<T>(value);
        }

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            Observer<T> observer = new Observer<T>(onNext, onError, onCompleted);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext, Action<Exception> onError)
        {
            Observer<T> observer = new Observer<T>(onNext, onError);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }       

        public static IDisposable Subscribe<T>(this IObservable<T> value, Action<T> onNext)
        {
            Observer<T> observer = new Observer<T>(onNext);
            IDisposable subject = value.Subscribe(observer);
            return subject;
        }
    }
}

For each method in the Observable class an subject object is generated. All these subjects are derived from BaseSubject<T>. The BaseSubject<T> class holds a list of Observers that are notified from the concrete implementation of BaseSubject<T>. To do so the concrete implementation uses the NotifyObservers, NotifyErrorObservers and NotifyCompleteObservers methodes. With the Subscribe and ColdSubscribe method Observers can register themselfes to  get notified if an Item is produced an error occured or the creation of items is completed. The difference between Subscribe and ColdSubscribe is that if Subscribe is called the subject starts creating items. As you see the abstract methode Execute is called in Subscribe. That means that the production of items starts here and the Observers become notified. In ColdSubcribe the Observer is added to the Observers collection, but the production of items does not start, because execute is not called.

public abstract class BaseSubject<T> : IObservable<T>
{
    public List<IObserver<T>> observers;
    private IScheduler scheduler = new TaskScheduler();
    public bool running;

    public IScheduler Scheduler
    {
        get { return scheduler; }
        set { this.scheduler = value; }
    }

    public abstract void Execute();

    protected virtual void NotifyObservers(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }     
    }        

    protected virtual void NotifyErrorObservers(Exception exception)
    {
        foreach (var item in observers)
        {
            item.OnError(exception);
        }
    }

    protected virtual void NotifyCompleteObservers()
    {
        foreach (var item in observers)
        {
            item.OnCompleted();
        }     
    }

    public BaseSubject()
    {
        observers = new List<IObserver<T>>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        if (!running)
        {
            this.running = true;
            Execute();
        }
        return disposable;
    }

    public IDisposable ColdSubscribe(IObserver<T> observer)
    {
        IDisposable disposalble = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        return disposalble;
    }
}

With this post the introduction of the SimpleRX structure ends. In the next posts i will show the use of SimpleRX and explain the different Observable methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)


reimplementing reactive extensions (rx) (part 1: Introduction)

After exploring the reactice extensions (rx) and trying some examples from the curing the asynchronous blues with the reactive extensions for .net i thought about how i could implement rx myself to cover some special cases. Moreover i read the famous blog post from john skeet reimplementing linq to objects and so i stated to reimplement the reactive extensions and put the project to codeplex. (simpleRX) All the code examples i am writing about can be found in the simpleRX source code.

In this first post i will show an overview over the structure of my implementation. In the following posts i will show the implementation of the the special rx commads. I will start with some simple commands like Return, Range, Repeat, Never and Throw. Then i will write about some timebased commands like GenerateFromTime, Buffer, Delay, Timer, GenerateWithTime, Throttle and DistinctUntilChanged). After that i will describe the FromEvent and FromAsyncPattern commands. In my last post i will write explain some special commands like Amb and Zip.

Overview of the SimpleRX project;

The most important parts are the implementation are the IObserver (the Observerver<T> class) and the IObservable (the BaseSubject<T> class) interface. These two classes are the base of the Obervable pattern. The IObserver interface has three methods. The OnNext method is called if the next item is available. The OnError method is called if a error occured in the generation of an item and the OnComplete method is called if the production of items ended. On very imported thing when using rx is that it works push based. That means you register for the production of some values or items, and you become called if the value is avaliable. When you use IEnumerable and IEnumerator this works pull based, that means you call the iteration to get the next item. Here is the implementation of the Observer<T> class  and the BaseSubject<T> class. The Observer<T> class implements the three methods of the IObserver interface and has three overloaded constructors to compose the Observer.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace SimpleRX
{
    public class Observer<T> : IObserver<T>
    {
        private Action<T> OnNext_ = p => { };
        private Action OnCompleted_ = () => { };
        private Action<Exception> OnError_ = ex => { };

        public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
            this.OnCompleted_ = onCompleted;
        }

        public Observer(Action<T> onNext, Action<Exception> onError)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
        }

        public Observer(Action<T> onNext)
        {
            this.OnNext_ = onNext;
        }

        public void OnCompleted()
        {
            OnCompleted_();
        }

        public void OnError(Exception error)
        {
            OnError_(error);
        }

        public void OnNext(T value)
        {
            OnNext_(value);
        }
    }
}

The BaseSubject<T> class implements the IObservalbe<T> interface and is the entry point for the subscription. If a client wants to substrice to a source that produces values he overtakes a class that implements the IObserver interface and if a item is produced, an error was generated or the production has completed the client gets called through this inteface.  The Subject class is abstract because it produces no items, the subclassed of BaseSubject produce the items (for example the ReturnSubject or GenerateSubject class). The Scheduler property allows the subject to use different Schedulers (for example a TaskScheduler) to produce the items. The ColdSubscribe method is used if one Subject wants to Subcribe for another Subject withou starting the production of items by the first Subject. The DisposeObject<T> implements the IDispose interface and with the call of Dispose a Subscriber can stop his subscription.

public abstract class BaseSubject<T> : IObservable<T>
{
    public List<IObserver<T>> observers;
    private IScheduler scheduler = new TaskScheduler();
    public bool running;

    public IScheduler Scheduler
    {
        get { return scheduler; }
        set { this.scheduler = value; }
    }

    public abstract void Execute();

    protected virtual void NotifyObservers(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }     
    }        

    protected virtual void NotifyErrorObservers(Exception exception)
    {
        foreach (var item in observers)
        {
            item.OnError(exception);
        }
    }

    protected virtual void NotifyCompleteObservers()
    {
        foreach (var item in observers)
        {
            item.OnCompleted();
        }     
    }

    public BaseSubject()
    {
        observers = new List<IObserver<T>>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        if (!running)
        {
            this.running = true;
            Execute();
        }
        return disposable;
    }

    public IDisposable ColdSubscribe(IObserver<T> observer)
    {
        IDisposable disposalble = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        return disposalble;
    }
}

Another interesting class is the Subject class. A subject implements IObservable<T> and IObserver<T>, that means you can subscribe to a Subject and call the OnNext, OnError and OnComplete yourself. That means you can control the creation of items, if you call OnNext and Observers thas Subscribed to your Subject can react on this creation. This is the implementation of the Subject class. As you see it Implements IObservable<T> IObserver<T> and IDisposable and it is a child class of BaseSubject<T>.

public class Subject<T> : BaseSubject<T>, IObservable<T>, IObserver<T>, IDisposable
{
    private Action<T> OnNext_ = p => { };
    private Action OnCompleted_ = () => { };
    private Action<Exception> OnError_ = ex => { };

    public Subject(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
        this.OnCompleted_ = onCompleted;
    }

    public Subject(Action<T> onNext, Action<Exception> onError)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
    }

    public Subject(Action<T> onNext)
    {
        this.OnNext_ = onNext;            
    }

    public Subject()
    {

    }

    public void OnCompleted()
    {
        NotifyCompleteObservers();
    }

    public void OnError(Exception error)
    {
        NotifyErrorObservers(error);
    }

    public void OnNext(T value)
    {
        NotifyObservers(value);
    }       

    public void Dispose()
    {
        NotifyCompleteObservers();
    }

    public override void Execute()
    {

    }
}

Because the internal implementation for a subject is different the Subjects class is needed.  The Subjects class works like the Observable class but for subjects.  In my next post i will describe the Observer and the Subjects classes and there many different methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)