Implementing a sychronized priority queue in c#

At my last project i used a queue to add download and upload tasks of documents. After most of the implementation was done i discovered that some of the upload and download tasks should have higher priority than others. So for my implementation i used a synchnoized wrapper of the .net queue. And with a queue it is not possible to work with items that have different priorities. At the following picture the use of a queue is ilustrated.
Queue
So at that point i was looking for a priority queue. The problem is that the .net framework does not provide such a data structure, so i had to implement it myself. The following picture shows how a priority queue works.
PriorityQueue
So if you want to implement a priority queue you need an underlying data structure. That data structure has to reorganize the enqued items so that the item with the highest priority is dequeued first. So one possible data structure is a list you reorder. If a item with a higher priority is enqued you bring it to the front of the list. That works fine with hundred of added items. But if you have several thousend items to enqueue the solution with the list gets slower and slower. Then you need a priority queue that is implemented with a tree structure. You could use a balanced search tree like an AVL tree, but most of data structure books recomend a binary heap. So i decided to implement my priority queue with two modes. In one mode it internaly uses a list, in the second mode it uses a binary heap. If you want to look at the code or use the priority queue, i added it to codeplex. (http://priorityQueue.codeplex.com).
Here is the uml class diagram and a two tests that show the use of the priority queue in linked list and in binary heap mode.
PriorityQueue

[TestClass]
public class PriorityQueueTests
{
    [TestMethod]
    public void PriorityQueueListModeTest()
    {
        PriorityQueue<int, string> priorityQueue =
            new PriorityQueue<int, string>(PriorityQueueMode.LinkedList);
        priorityQueue.Enqueue(1, "A");
        priorityQueue.Enqueue(2, "B");
        priorityQueue.Enqueue(3, "C");
        priorityQueue.Enqueue(4, "D");
        priorityQueue.Enqueue(5, "E");
        priorityQueue.Enqueue(6, "F");
        var count = priorityQueue.Count;
        var result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "F");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "E");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "D");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "C");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "B");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "A");
    }

    [TestMethod]
    public void PriorityQueueBinaryHeapModeTest()
    {
        PriorityQueue<int, string> priorityQueue =
            new PriorityQueue<int, string>(PriorityQueueMode.BinaryHeap);
        priorityQueue.Enqueue(1, "A");
        priorityQueue.Enqueue(2, "B");
        priorityQueue.Enqueue(3, "C");
        priorityQueue.Enqueue(4, "D");
        priorityQueue.Enqueue(5, "E");
        priorityQueue.Enqueue(6, "F");
        var count = priorityQueue.Count;
        var result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "F");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "E");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "D");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "C");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "B");
        result = priorityQueue.Dequeue();
        Assert.AreEqual(result, "A");
    }
}

The two tests show that the items with higher priority are ranked at the top of the priority queue and the items with lower priority are ranked at the end of the priority queue.


What is new in c# 5.0 – Asynchrone Programming

Today i will hold a talk about “What is new in c# 5.0 – Asynchrone Programming” at the .net usergroup south austria. The agenda is:

  • what is new in c# 5.0
  • async ctp – .net 4.5
  • task based asynchron pattern
  • async – await (Asynchronous Control Flow)
  • async intern
  • synchronisationContext in
  • Console App, Winforms App und Asp.net
  • cancellation
  • exception handling
  • combinators
  • async – await with the Windows Runtime
  • caller information

The powerpoint presentation and the examples are on my public skydrive folder:

What is new in c# 5.0 – Asynchrone Programming


reimplementing reactive extensions (rx) (part 1: Introduction)

After exploring the reactice extensions (rx) and trying some examples from the curing the asynchronous blues with the reactive extensions for .net i thought about how i could implement rx myself to cover some special cases. Moreover i read the famous blog post from john skeet reimplementing linq to objects and so i stated to reimplement the reactive extensions and put the project to codeplex. (simpleRX) All the code examples i am writing about can be found in the simpleRX source code.

In this first post i will show an overview over the structure of my implementation. In the following posts i will show the implementation of the the special rx commads. I will start with some simple commands like Return, Range, Repeat, Never and Throw. Then i will write about some timebased commands like GenerateFromTime, Buffer, Delay, Timer, GenerateWithTime, Throttle and DistinctUntilChanged). After that i will describe the FromEvent and FromAsyncPattern commands. In my last post i will write explain some special commands like Amb and Zip.

Overview of the SimpleRX project;

The most important parts are the implementation are the IObserver (the Observerver<T> class) and the IObservable (the BaseSubject<T> class) interface. These two classes are the base of the Obervable pattern. The IObserver interface has three methods. The OnNext method is called if the next item is available. The OnError method is called if a error occured in the generation of an item and the OnComplete method is called if the production of items ended. On very imported thing when using rx is that it works push based. That means you register for the production of some values or items, and you become called if the value is avaliable. When you use IEnumerable and IEnumerator this works pull based, that means you call the iteration to get the next item. Here is the implementation of the Observer<T> class  and the BaseSubject<T> class. The Observer<T> class implements the three methods of the IObserver interface and has three overloaded constructors to compose the Observer.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace SimpleRX
{
    public class Observer<T> : IObserver<T>
    {
        private Action<T> OnNext_ = p => { };
        private Action OnCompleted_ = () => { };
        private Action<Exception> OnError_ = ex => { };

        public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
            this.OnCompleted_ = onCompleted;
        }

        public Observer(Action<T> onNext, Action<Exception> onError)
        {
            this.OnNext_ = onNext;
            this.OnError_ = onError;
        }

        public Observer(Action<T> onNext)
        {
            this.OnNext_ = onNext;
        }

        public void OnCompleted()
        {
            OnCompleted_();
        }

        public void OnError(Exception error)
        {
            OnError_(error);
        }

        public void OnNext(T value)
        {
            OnNext_(value);
        }
    }
}

The BaseSubject<T> class implements the IObservalbe<T> interface and is the entry point for the subscription. If a client wants to substrice to a source that produces values he overtakes a class that implements the IObserver interface and if a item is produced, an error was generated or the production has completed the client gets called through this inteface.  The Subject class is abstract because it produces no items, the subclassed of BaseSubject produce the items (for example the ReturnSubject or GenerateSubject class). The Scheduler property allows the subject to use different Schedulers (for example a TaskScheduler) to produce the items. The ColdSubscribe method is used if one Subject wants to Subcribe for another Subject withou starting the production of items by the first Subject. The DisposeObject<T> implements the IDispose interface and with the call of Dispose a Subscriber can stop his subscription.

public abstract class BaseSubject<T> : IObservable<T>
{
    public List<IObserver<T>> observers;
    private IScheduler scheduler = new TaskScheduler();
    public bool running;

    public IScheduler Scheduler
    {
        get { return scheduler; }
        set { this.scheduler = value; }
    }

    public abstract void Execute();

    protected virtual void NotifyObservers(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }     
    }        

    protected virtual void NotifyErrorObservers(Exception exception)
    {
        foreach (var item in observers)
        {
            item.OnError(exception);
        }
    }

    protected virtual void NotifyCompleteObservers()
    {
        foreach (var item in observers)
        {
            item.OnCompleted();
        }     
    }

    public BaseSubject()
    {
        observers = new List<IObserver<T>>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        if (!running)
        {
            this.running = true;
            Execute();
        }
        return disposable;
    }

    public IDisposable ColdSubscribe(IObserver<T> observer)
    {
        IDisposable disposalble = new DisposeObject<T>(observers, this, observer);
        observers.Add(observer);
        return disposalble;
    }
}

Another interesting class is the Subject class. A subject implements IObservable<T> and IObserver<T>, that means you can subscribe to a Subject and call the OnNext, OnError and OnComplete yourself. That means you can control the creation of items, if you call OnNext and Observers thas Subscribed to your Subject can react on this creation. This is the implementation of the Subject class. As you see it Implements IObservable<T> IObserver<T> and IDisposable and it is a child class of BaseSubject<T>.

public class Subject<T> : BaseSubject<T>, IObservable<T>, IObserver<T>, IDisposable
{
    private Action<T> OnNext_ = p => { };
    private Action OnCompleted_ = () => { };
    private Action<Exception> OnError_ = ex => { };

    public Subject(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
        this.OnCompleted_ = onCompleted;
    }

    public Subject(Action<T> onNext, Action<Exception> onError)
    {
        this.OnNext_ = onNext;
        this.OnError_ = onError;
    }

    public Subject(Action<T> onNext)
    {
        this.OnNext_ = onNext;            
    }

    public Subject()
    {

    }

    public void OnCompleted()
    {
        NotifyCompleteObservers();
    }

    public void OnError(Exception error)
    {
        NotifyErrorObservers(error);
    }

    public void OnNext(T value)
    {
        NotifyObservers(value);
    }       

    public void Dispose()
    {
        NotifyCompleteObservers();
    }

    public override void Execute()
    {

    }
}

Because the internal implementation for a subject is different the Subjects class is needed.  The Subjects class works like the Observable class but for subjects.  In my next post i will describe the Observer and the Subjects classes and there many different methodes.

SimpleRx: (part1) (part2) (part3) (part4) (part5) (part6) (part7) (part8)


Generate a lock free stack in c#

In .net 4.0 the concurrent datastructures were introduced (System.Collections.Concurrent). This datastructures uses a lockfree algorithm to implement the access control. In this post i will write about implementing a lock free stack. To do so i will first develop an concurrent stack that works with locking, and then i will show what we need to change to make the stack lock free. For the beginning i will show how we can use the ConcurrentStack that is implemented in the System.Collection.Concurrent namespace.

  System.Collections.Concurrent.ConcurrentStack<int> concurrentStack = new
System.Collections.Concurrent.ConcurrentStack<int>();

concurrentStack.Push(1);
concurrentStack.Push(2);

int result = 0;

concurrentStack.TryPop(out result);

Now i will implement this stack with a lock.

public class ConcurrentStack
{
    private class Node
    {
        public Node Next;
        public V Item;
    }

    private Node head;

    public ConcurrentStack()
    {
        head = new Node();
    }

    public void Push(T item)
    {
        Node node = new Node();
        node.Item = item;
        lock (head)
        {
            node.Next = head.Next;
            head.Next = node;
        }
    }

    public bool TryPop(out T result)
    {
        Node node = head.Next;
        if (node == null)
        {
            result = default(T);
            return false;
        }
        lock (head)
        {
            head.Next = node.Next;
            result = node.Item;
            return true;
        }
    }

    public bool TryPeek(out T result)
    {
        Node node = head.Next;
        if (node == null)
        {
            result = default(T);
            return false;
        }
        lock (head)
        {
            node = head.Next;
            result = node.Item;
            while (node.Next != null)
            {
                node = node.Next;
                result = node.Item;
            }
            return true;
        }
    }
}

So if you use this stack from multible threads then the locks in Push and TryPop become the bottle neck. So Microsoft introduced a so called lock free stack implementation. To implement such a lock free stack we need to know which commands produce the problems when accessed from different threads. This commands are:

lock (head)
{
    node.Next = head.Next;
    head.Next = node;
}

in the Push method and

lock (head)
{
    head.Next = node.Next;
    result = node.Item;
    return true;
}

in the TryParse method. So if we have an context switch between this two commandos then that would break the stack because node.Next points to one head element and the next element of another head element points to this node. So the lock prevents this context switch and so the stack is not broken. If we want to implement this without a lock we have to remove the lock and replace it with something else. To implement this i will use the compare and swap algorithm
So here is my implementation of the lock free stack:

public class ConcurrentStack
{
    private class Node
    {
        public Node Next;
        public V Item;
    }

    private Node head;

    public ConcurrentStack()
    {
        head = new Node();
    }

    public void Push(T item)
    {
        Node node = new Node();
        node.Item = item;
        do
        {
            node.Next = head.Next;
        } while (!CompareAndSwap(ref head.Next, node.Next, node));
    }        

    public bool TryPop(out T result)
    {
        Node node;
        do
        {
            node = head.Next;
            if (node == null)
            {
                result = default(T);
                return false;
            }
        } while (!CompareAndSwap(ref head.Next, node, node.Next));
        result = node.Item;
        return true;
    }

    // if headNext is nodeNext 
    // (that was assigned now, that means that no other thread has inserted or removed something,
    // if some thead had assigned something then head.next would not have the same value as node.next) 
    // then node.next is assigned to head.next
    private static bool CompareAndSwapUnsafe(ref Node destination,
      Node currentValue, Node newValue)
    {
        // has to be atomar (that means that there must not be a thread context switch)
        if (destination == currentValue)
        {
            destination = newValue;
            return true;
        }
        return false;
    }

    // here the compare and swap method is implemented thread safe with the Interlocked.CompareExchange method
    // CompareExchange compares one value with another value and if they are the // same a new value is asigned to the // first value. This is an atomic operation // that means that there is no context switch possible.
    private static bool  CompareAndSwap(ref Node destination, Node
      currentValue, Node newValue)
    {
        if (currentValue == Interlocked.CompareExchange<Node>
        (ref destination, newValue, currentValue))
            return true;
        else
            return false;
    }
}

Here we can see that the CompareAndSwap function replaces the lock and if the stack is used from many threads than this implementation is faster than the implementation with the lock because no context switch is necessary. In the CompareAndSwapUnsafe function the principle is shown. The problem is that the compare between the destination and the currentValue and the allocation between destination and newValue has to be atomar. This two steps are atomar executed by the Interlocked.CompareExchange method. So this method guaranteed that the two commandos are executed without a thread context switch. So if you want to implement an other datastructure (e.g. a queue or a list) you can use the simmular method.