reimplementing reactive extensions (rx) (part 5: Contains and Count)

In the last two posts (part3, part4) i wrote about generation functions that build Observabe<T> sequences. In this post i will write about some simple transformation methodes that take a IObservable sequence and transform it into another IObservable sequence. The transformation methodes i will explain are:

  • Contains
  • Count 

Contains<T>

The Contains method checks if an IObservable sequence contains a specific value. In this example i generate an Observable sequence with the Observable.Range method. That method generates a sequence from the startvalue (1) to the endvalue (4). At this sequence we use the contains extension method that should check if the given value (2) is in the sequence. It returns a IObservable<bool> that means every Observer is notified one time, if the sequence contains the value or not.

[TestMethod]
public void ContainsTest()
{
    IObservable<bool> returnObject = Observable.Range(1, 4).Contains(2);
    returnObject.Subscribe<bool>(
        x => Assert.IsTrue(x),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

The Contains method is the first Extension method in SimpleRX i write about. It checks if a given value is in the Observable sequence.

public static IObservable<bool> Contains<T>(this IObservable<T> source, T value)
{
    return new ContainsSubject<T>(source, value);
}

The ContainsSubject method takes the IObservable<T> source and the value to check. It casts the source to a base Subject that means that this extension method only works if the source is a BaseSubject. If you use another source that implements IObservable<T> but is not a BaseSubject it will not work. After that we create a new Observer<T> object. The Observer is described in part1 of this post series. So what we do is we overtake an OnNext, OnError and OnComplete Action to the Observer. With that Observer we call a ColdSubscribe of the overtaken subject. That means the ContainsSubject reacts on the OnNext, OnError and OnComplete of the overtaken source. To see the implementation of ColdSubscribe look at part1 of this post series. Now after subscribing the overtaken source we look at the methodes we have overtaken. The InnerExecute method takes the aktuell value that was produced by the source. It simply checks if the generated value of the source has the same value as the overtaken value. If it has the same value it set the boolean containsValue to true. That means the source contains the value. If the generation of the source ends (calling OnComplete) the ContainsSubject calls all Observers that subscribed to it an tells overtakes them the containsValue. After that it calls the NotifyCompleteObservers method to notify all Observers that the generation completed. The starting point that starts the generation of the source items it the overloaded Execute method. In this method the Execute method of the overtaken source is executed.

public class ContainsSubject<T> : BaseSubject<bool>
{
    private IObservable<T> source;        
    private IDisposable subscriped;
    private BaseSubject<T> subject;
    private bool containsValue;
    private T value;

    public ContainsSubject(IObservable<T> source, T value)
    {
        this.source = source;
        this.value = value;
        BaseSubject<T> subject = (BaseSubject<T>)source;
        this.subject = subject;
        var observer =
            new Observer<T>(
                localvalue => InnerExecute(localvalue),
                ex => NotifyErrorObservers(ex),
                () => InnerComplete());
        this.subscriped = subject.ColdSubscribe(observer);
    }

    private void InnerExecute(T value)
    {
        if (this.value.Equals(value))
        {
            containsValue = true;                
        }                        
    }

    private void InnerComplete()
    {
        NotifyObservers(containsValue);
        NotifyCompleteObservers();
    }

    public override void Execute()
    {
        try
        {
            subject.Execute();
        }
        catch (Exception exception)
        {
            NotifyErrorObservers(exception);
        }         
    }
}


Count<T>

The count method is like the Contains method an extension method that works on a given sequence. It simply counts the values in the sequence. In this example i generate a observable sequence with the Observable.Range method and then i count the items in the sequence. 

[TestMethod]
public void CountTest()
{
    IObservable<int> returnObject = Observable.Range(1, 4).Count();
    returnObject.Subscribe<int>(
        x => Assert.AreEqual<int>(x, 4),
        ex => Console.WriteLine("OnError {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
}

The implementation is simular to the Contains implementation but there are some differences. The count method returns a CountSubject.

public static IObservable<int> Count<T>(this IObservable<T> source)
{
    return new CountSubject<T>(source);
}

This CountSubject observes the overtaken observable sequnce by using the ColdSubscribe method. In the InnerExecute methode the items of the underlying sequence (source) counted. If the sequence completed the InnerComplete method is called and the NotifyObserver(counter) and NotifyCompleteObservers() is called to notify the Observers of the CountSubject sequence.

public class CountSubject<T> : BaseSubject<int>
{
    private IObservable<T> source;        
    private IDisposable subscriped;
    private BaseSubject<T> subject;
    private bool containsValue;
    private int counter;

    public CountSubject(IObservable<T> source)
    {
        this.source = source;
        BaseSubject<T> subject = (BaseSubject<T>)source;
        this.subject = subject;
        var observer =
            new Observer<T>(
                localvalue => InnerExecute(localvalue),
                ex => NotifyErrorObservers(ex),
                () => InnerComplete());
        this.subscriped = subject.ColdSubscribe(observer);
    }

    private void InnerExecute(T value)
    {
        counter++;        
    }

    private void InnerComplete()
    {
        NotifyObservers(counter);
        NotifyCompleteObservers();
    }

    public override void Execute()
    {
        try
        {
            subject.Execute();
        }
        catch (Exception exception)
        {
            NotifyErrorObservers(exception);
        }           
    }
}

There are a lot of transformation methodes in SimpleRX (First, Last, Skip, Take) but they work very simular to the Contains and Count methodes. So in the next part of this blog series i will write about the Where, Select and SelectMany methodes. 

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)

Advertisements


If you have a note or a question please write a comment.

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s