BlockingCollection

19 08 2011

When you call TryTake on a ConcurrentQueue, ConcurrentBag or ConcurrentStack and the collection is empty you get false. In producer/consumer situations you often use events to wait when a collection is empty and signal when it is not. A blocking collection wraps any collection that implements IProducerConsumerCollectionand provides this wait and signal functionality for you. So when you call TryTake on a BlockingCollection the calling thread blocks until there are items present.

To use BlockingCollection:

1. Instantiate the class, optionally specifying the IProducerConsumerCollectionto wrap and the maximum size (bound) of the collection.

2. Call Add or TryAdd to add elements to the underlying collection.

3. Call Take or TryTake to remove (consume) elements from the underlying collection.

If you call the constructor without passing in a collection, the class will default to instantiating a ConcurrentQueue.

Another way to consume elements is to call GetConsumingEnumerable. This returns a (potentially) infinite sequence that yields elements as they become available. You can force the sequence to end by calling CompleteAdding: this method also prevents further elements from being enqueued.

Here is an example from my OrderMatchingEngine project:

public class DedicatedThreadOrderProcessor : OrderBook.OrderProcessor
{
    private readonly Thread m_Thread;
    private readonly BlockingCollection m_PendingOrders = new BlockingCollection();

    public DedicatedThreadOrderProcessor(BuyOrders buyOrders, SellOrders sellOrders, Trades trades)
        : base(buyOrders, sellOrders, trades)
    {
        m_Thread = new Thread(ProcessOrders);
        m_Thread.Start();
    }

    private void ProcessOrders()
    {
        foreach (var order in m_PendingOrders.GetConsumingEnumerable())
        {
            ProcessOrder(order);
        }
    }

    public void Stop()
    {
        this.m_PendingOrders.CompleteAdding();
    }

    public override void InsertOrder(Order order)
    {
        m_PendingOrders.Add(order);
    }

}
Advertisements




Semaphores

11 07 2011

A semaphore allows a special kind of critical region that is not mutually exclusive. Threads may perform a take or put operation on a semaphore, atomically decreasing or increasing its current count, respectively. When a thread tries to take from a semaphore that already has a count of 0, the thread blocks until the count becomes non-0. The initial count of the semaphore specifies the maximum number of threads that can be inside the region. Unlike mutexes, semaphores are never considered to be owned by a specific thread. Different threads can safely put and take from the semaphore without any problems.

To take from the semaphore a thread calls Wait() and to put a thread calls Release(). Semaphores are typically used to protect resources that are finite in capacity. A pool of database connections for example or a swimming pool with a maximum capacity. A semaphore can span processes in the same was as a Mutex if you give it a name on construction.

class SwimmingPool
{
  static SemaphoreSlim m_Sem = new SemaphoreSlim (2);

  static void Main()
  {
    for (int i = 1; i <= 5; i++) new Thread (Swim).Start (i);
  }

  static void Swim(object id)
  {
    Console.WriteLine (id + " wants to swim");
    m_Sem.Wait();
    Console.WriteLine (id + " is swimming");           // Only two swimmers
    Thread.Sleep (1000 * (int) id);                    // can be here at
    Console.WriteLine (id + " is done swimming");       // a time.
    m_Sem.Release();
  }
}




Mutexes

10 07 2011

A mutex is a synchronization primitive, similar to a monitor, but which can also be used for interprocess synchronization. When you create a mutex you can give it a name to identify it across processes. To lock a mutex you call the WaitOne method and the ReleaseMutex method to unlock.

A common use of a mutex is to ensure that only one instance of a program can run at a time. Mutexes are about 50 times slower to obtain than monitors.

class Program
{
  static void Main()
  {
    using (var mutex = new Mutex (false, "My Program"))
    {
      //try to obtain the mutex, give it some time in case another instance is exiting
      if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false))
      {
        Console.WriteLine ("Another instance is running.");
        return;
      }
      else
      {
           //Launch
      }
    }
  }
}




Monitors (CLR locks)

9 07 2011

The CLR provides “monitors” as the managed code equivalent to critical regions and Win32’s critical sections. You call Monitor.Enter on an object to acquire the monitor, if another thread already holds that monitor the calling thread will block until the owning thread releases it. The owning thread releases the monitor by calling Monitor.Exit.

Don’t use ‘this’ as your object to lock on. It is better practice to explicitly manage and wall off your synchronization objects from the rest of the world. Using ‘this’ can make encapsulation difficult by exposing the implementation detail of your locking mechanism to clients of your object. In order to ensure that a thread always leaves the monitor you use a try / finally block so that the monitor is released even in the face of an exception.

Monitors are different from mutexes. A mutex works across multiple processes while monitors are for a single process. Monitors are a lot faster (~ 50 times faster) to acquire than mutexes because they are acquired in user mode, not kernel mode.

object monitorObj = new object();

Monitor.Enter(monitorObj);
try
{

}
finally
{
    Monitor.Exit(monitorObj);
}

This pattern is so common that C# provides syntactic sugar for it in the form of the lock statement.

object monitorObj = new object();

lock(monitorObj)
{
}

The same IL is emitted by the C# compiler in both of the cases above.





Using the Thread Pool to Process Many Items at Once

6 07 2011

A common requirement of concurrent programming is the ability to wait until a work item or set of work items that were queued to the thread pool have completed. A simple example for a single work item is to allocate an event that is set at the end of the work and have the calling thread wait on it.

using(ManualResetEvent finishedWork = new ManualResetEvent(false))
{
    ThreadPool.QueueUserWorkItem(delegate
    {
        //do the work on a Thread Pool thread
        finishedWork.Set();
    });

    //do some work at the same time as the Thread Pool
    //wait for the work to be finished
    finishedWork.WaitOne();
}

One approach for many work items would be to create multiple events and wait for them all to be set. The more efficient approach is to avoid creating more than one event and have the last thread to complete signal the calling thread that all the work has been finished.

int numWorkers = 10;
using(ManualResetEvent finishedWork = new ManualResetEvent(false))
{
    for (int i = 0; i < numWorkers; ++i)
    {
        ThreadPool.QueueUserWorkItem(delegate
        {
            //do the work on a Thread Pool thread

            if(Interlocked.Decrement(ref numWorkers) == 0)
                finishedWork.Set();
        });
    }

    //do some work at the same time as the Thread Pool
    //wait for the work to be finished
    finishedWork.WaitOne();
}




Using Reflection to Choose a Method at Runtime

5 07 2011

An area in our code is charged with serializing a given object to a given BinaryWriter. Most of the time the object is a primitive type that requires just figuring out what type it is, casting it and calling the appropriate Write() method on the BinaryWriter. The obvious solution is a big if statement checking the object’s type and forwarding the casted object to Write(). This approach is a bit of a pain because the method becomes big and a pain to maintain, it’s also frustrating to look at a lot of code that is basically the same repeated over and over.

I tried a different approach using the reflection classes and a little Linq. The resulting method is small and covers all the primitive types in one go. This approach is probably slower than the if-else one (I’ll profile it later) but the reduced code size and increased maintainability make it a good option.

 
static void Serialize(Object obj, BinaryWriter writer)
{
    var objRunTimeType = obj.GetType();
    var binaryWriterType = typeof(BinaryWriter);

    var writeMethod = (from m in binaryWriterType.GetMethods()
                       where m.Name == "Write" && m.GetParameters()[0].ParameterType == objRunTimeType
                       select m).First();

    writeMethod.Invoke(writer, new object[] {obj});
}




Object.GetType()

4 07 2011

Everything in C# inherits from System.Object and as a result every instance of an object has access to the GetType() method. This method returns an instance of Type that represents the exact runtime type of the instance it is called on.

This method came in very handy today as I had to keep a record in memory of the type of every object I serialized to disk in order to deserialize it correctly later. So I set up a Listand stored each type as I serialized the data (actually I did more than that, see code). I think that relying on GetType() is acceptable in those sort of situations, if you find that your code uses it heavily in many places that’s a symptom of a design that needs rethinking.

private class TypeTracker
{
    public class ColumnComparisonTypes
    {
        public Type SrcType { get; set; }
        public Type TgtType { get; set; }

        public bool Complete { get { return SrcType != null && TgtType != null; } }
    }

    private readonly List m_ColumnComparisonTypes;
    private bool Complete
    {
      get { return m_ColumnComparisonTypes.TrueForAll(c => c.Complete); }  
    } 

    public TypeTracker(ColumnComparisonSettingsCollection columnComparisonSettingsCollection)
    {
        m_ColumnComparisonTypes = new List(columnComparisonSettingsCollection.Count);
        for(int i=0; i<columnComparisonSettingsCollection.Count; ++i)
            m_ColumnComparisonTypes.Add(new ColumnComparisonTypes());
    }

    public ColumnComparisonTypes this[int i]
    {
        get { return m_ColumnComparisonTypes[i]; }
    }

    public void Track(RowComparisonResult rowComparisonResult)
    {
        if (rowComparisonResult == null) throw new ArgumentNullException("rowComparisonResult");
        if (Complete) return;

        for(int i = 0; i < rowComparisonResult.CellResults.Count; ++i)
        {
            var cellResult = rowComparisonResult.CellResults[i];

            if (cellResult.SourceValue != null && m_ColumnComparisonTypes[i].SrcType == null)
            {
                m_ColumnComparisonTypes[i].SrcType = cellResult.SourceValue.GetType();
            }
            if (cellResult.TargetValue != null && m_ColumnComparisonTypes[i].TgtType == null)
            {
                m_ColumnComparisonTypes[i].TgtType = cellResult.TargetValue.GetType();
            }
        }
    }

}