Using the Strategy Pattern to Change Threading at Runtime

21 08 2011

The strategy pattern involves one class delegating some behaviour to a member which meets some interface. By delegating in this way you can change behaviour at runtime by changing the member to some other type from that same inheritance hierarchy. In my OrderMatchingEngine project I found a cool (to me at least) use for this pattern. You can change the behaviour of a component from synchronous, to multithreaded via the ThreadPool to multithreaded via dedicated heavyweight threads and back again as required at runtime.

In my project each OrderBook has an OrderProcessor. There are three types that inherit from OrderProcessor. They are SynchronousOrderProcessor, ThreadPooledOrderProcessor and DedicatedThreadsOrderProcessor. With some careful locking inside the OrderProcessor Property setter and the InserOrder method you can safely switch which type of OrderProcessor is used, committing more resources when and where they are needed.

On receipt of its first order the Market kicks off a timer which fires every second and prioritises the OrderBooks by number of orders received. The top 10% of OrderBooks are assigned dedicated threads, the next 20% use the ThreadPool and the remainder are given synchronous behaviour. In this way the Market can react to peaks of activity during the day and give more performance to those OrderBooks that need it.

namespace OrderMatchingEngine.OrderBook
{
public class SynchronousOrderProcessor : OrderProcessor
    {
        public SynchronousOrderProcessor(BuyOrders buyOrders, SellOrders sellOrders, Trades trades)
            : base(buyOrders, sellOrders, trades)
        {
        }

        public override void InsertOrder(Order order)
        {
            ProcessOrder(order);
        }
    }

    public class ThreadPooledOrderProcessor : OrderProcessor
    {
        public ThreadPooledOrderProcessor(BuyOrders buyOrders, SellOrders sellOrders, Trades trades)
            : base(buyOrders, sellOrders, trades)
        {
        }

        public override void InsertOrder(Order order)
        {
            ThreadPool.QueueUserWorkItem((o) => ProcessOrder(order));
        }
    }
    public class DedicatedThreadOrderProcessor : OrderProcessor
    {
        private readonly Thread m_Thread;
        private readonly BlockingCollection m_PendingOrders = new BlockingCollection();

        public DedicatedThreadOrderProcessor(BuyOrders buyOrders, SellOrders sellOrders, Trades trades)
            : base(buyOrders, sellOrders, trades)
        {
            m_Thread = new Thread(ProcessOrders);
            m_Thread.Start();
        }

        private void ProcessOrders()
        {
            foreach (Order order in m_PendingOrders.GetConsumingEnumerable())
                ProcessOrder(order);
        }

        public void Stop()
        {
            m_PendingOrders.CompleteAdding();
        }

        public override void InsertOrder(Order order)
        {
            m_PendingOrders.Add(order);
        }
    }

    public class OrderBook
    {
        private OrderProcessor m_OrderProcessingStrategy;

        private readonly Object m_Locker = new Object();

        public Instrument Instrument { get; private set; }
        public BuyOrders BuyOrders { get; private set; }
        public SellOrders SellOrders { get; private set; }
        public Trades Trades { get; private set; }
        public Statistics Statistics { get; private set; }

        public OrderProcessor OrderProcessingStrategy
        {
            get { return m_OrderProcessingStrategy; }
            set
            {
                lock (m_Locker)
                {
                    var dedicatedThreadOrderProcessor = m_OrderProcessingStrategy as DedicatedThreadsOrderProcessor;

                    if (dedicatedThreadOrderProcessor != null)
                        dedicatedThreadOrderProcessor.Stop();

                    m_OrderProcessingStrategy = value;
                }
            }
        }

        public OrderBook(Instrument instrument, BuyOrders buyOrders, SellOrders sellOrders, Trades trades,
                         OrderProcessor orderProcessingStrategy)
        {
            if (instrument == null) throw new ArgumentNullException("instrument");
            if (buyOrders == null) throw new ArgumentNullException("buyOrders");
            if (sellOrders == null) throw new ArgumentNullException("sellOrders");
            if (trades == null) throw new ArgumentNullException("trades");
            if (orderProcessingStrategy == null) throw new ArgumentNullException("orderProcessingStrategy");
            if (!(instrument == buyOrders.Instrument && instrument == sellOrders.Instrument))
                throw new ArgumentException("instrument does not match buyOrders and sellOrders instrument");

            Instrument = instrument;
            BuyOrders = buyOrders;
            SellOrders = sellOrders;
            Trades = trades;
            OrderProcessingStrategy = orderProcessingStrategy;
            Statistics = new Statistics();
        }

        public OrderBook(Instrument instrument)
            : this(instrument, new BuyOrders(instrument), new SellOrders(instrument), new Trades(instrument))
        {
        }

        public OrderBook(Instrument instrument, BuyOrders buyOrders, SellOrders sellOrders, Trades trades)
            : this(
                instrument, buyOrders, sellOrders, trades, new SynchronousOrderProcessor(buyOrders, sellOrders, trades))
        {
        }

        public void InsertOrder(Order order)
        {
            if (order == null) throw new ArgumentNullException("order");
            if (order.Instrument != Instrument)
                throw new OrderIsNotForThisBookException();

            OrderReceived();

            //the strategy can change at runtime so lock here and in OrderProcessingStrategy property
            lock (m_Locker)
                OrderProcessingStrategy.InsertOrder(order);
        }

        private void OrderReceived()
        {
            var numOrders = Statistics[Statistics.Stat.NumOrders];
            numOrders++;
        }

        public class OrderIsNotForThisBookException : Exception
        {
        }
    }
}




BlockingCollection

19 08 2011

When you call TryTake on a ConcurrentQueue, ConcurrentBag or ConcurrentStack and the collection is empty you get false. In producer/consumer situations you often use events to wait when a collection is empty and signal when it is not. A blocking collection wraps any collection that implements IProducerConsumerCollectionand provides this wait and signal functionality for you. So when you call TryTake on a BlockingCollection the calling thread blocks until there are items present.

To use BlockingCollection:

1. Instantiate the class, optionally specifying the IProducerConsumerCollectionto wrap and the maximum size (bound) of the collection.

2. Call Add or TryAdd to add elements to the underlying collection.

3. Call Take or TryTake to remove (consume) elements from the underlying collection.

If you call the constructor without passing in a collection, the class will default to instantiating a ConcurrentQueue.

Another way to consume elements is to call GetConsumingEnumerable. This returns a (potentially) infinite sequence that yields elements as they become available. You can force the sequence to end by calling CompleteAdding: this method also prevents further elements from being enqueued.

Here is an example from my OrderMatchingEngine project:

public class DedicatedThreadOrderProcessor : OrderBook.OrderProcessor
{
    private readonly Thread m_Thread;
    private readonly BlockingCollection m_PendingOrders = new BlockingCollection();

    public DedicatedThreadOrderProcessor(BuyOrders buyOrders, SellOrders sellOrders, Trades trades)
        : base(buyOrders, sellOrders, trades)
    {
        m_Thread = new Thread(ProcessOrders);
        m_Thread.Start();
    }

    private void ProcessOrders()
    {
        foreach (var order in m_PendingOrders.GetConsumingEnumerable())
        {
            ProcessOrder(order);
        }
    }

    public void Stop()
    {
        this.m_PendingOrders.CompleteAdding();
    }

    public override void InsertOrder(Order order)
    {
        m_PendingOrders.Add(order);
    }

}




How Does the ThreadPool’s Minimum Thread Count Work?

17 08 2011

ThreadPool threads are created only on demand, increasing the minimum thread count to x does not mean that x threads are created straight away. Rather, it instructs the pool manager to create up to x threads the instant they are required.

The base strategy of the ThreadPool manager is to create the same number of threads as there are cores, if the queue remains stationary for more than half a second the pool manager responds by creating more threads – one every half-second – up to the capacity of the ThreadPool. This helps to prevent a brief burst of short-lived activity by fully allocating threads and suddenly swelling an application’s memory footprint. Where your tasks block on something e.g. a database query to return, it can be useful to avoid this half second delay.

You can tell the pool manager not to delay in the allocation of the first x threads, by calling SetMinThreads, for instance:

ThreadPool.SetMinThreads (50, 50);

The second value indicates how many threads to assign to I/O completion ports.





Thread Priority

14 08 2011

A thread’s Priority property determines how much execution time it gets relative to other active threads in the operating system. The options are:

enum ThreadPriority { Lowest, BelowNormal, Normal, AboveNormal, Highest }

This becomes relevant only when multiple threads are simultaneously active. Raising a thread’s priority can lead to problems such as resource starvation for other threads. A thread’s priority is throttled by the application’s process priority. To perform real-time work, you must also elevate the process priority using the Process class in System.Diagnostics:

using (Process p = Process.GetCurrentProcess()) 
    p.PriorityClass = ProcessPriorityClass.High;

The top ProcessPriorityClass is Realtime. Setting a process priority to Realtime instructs the OS that you’re process should never yield CPU time to another. If your program enters an accidental infinite loop, you can find that the power button is your only option! For this reason, High is usually the best choice for real-time applications.





Semaphores

11 07 2011

A semaphore allows a special kind of critical region that is not mutually exclusive. Threads may perform a take or put operation on a semaphore, atomically decreasing or increasing its current count, respectively. When a thread tries to take from a semaphore that already has a count of 0, the thread blocks until the count becomes non-0. The initial count of the semaphore specifies the maximum number of threads that can be inside the region. Unlike mutexes, semaphores are never considered to be owned by a specific thread. Different threads can safely put and take from the semaphore without any problems.

To take from the semaphore a thread calls Wait() and to put a thread calls Release(). Semaphores are typically used to protect resources that are finite in capacity. A pool of database connections for example or a swimming pool with a maximum capacity. A semaphore can span processes in the same was as a Mutex if you give it a name on construction.

class SwimmingPool
{
  static SemaphoreSlim m_Sem = new SemaphoreSlim (2);

  static void Main()
  {
    for (int i = 1; i <= 5; i++) new Thread (Swim).Start (i);
  }

  static void Swim(object id)
  {
    Console.WriteLine (id + " wants to swim");
    m_Sem.Wait();
    Console.WriteLine (id + " is swimming");           // Only two swimmers
    Thread.Sleep (1000 * (int) id);                    // can be here at
    Console.WriteLine (id + " is done swimming");       // a time.
    m_Sem.Release();
  }
}




Mutexes

10 07 2011

A mutex is a synchronization primitive, similar to a monitor, but which can also be used for interprocess synchronization. When you create a mutex you can give it a name to identify it across processes. To lock a mutex you call the WaitOne method and the ReleaseMutex method to unlock.

A common use of a mutex is to ensure that only one instance of a program can run at a time. Mutexes are about 50 times slower to obtain than monitors.

class Program
{
  static void Main()
  {
    using (var mutex = new Mutex (false, "My Program"))
    {
      //try to obtain the mutex, give it some time in case another instance is exiting
      if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false))
      {
        Console.WriteLine ("Another instance is running.");
        return;
      }
      else
      {
           //Launch
      }
    }
  }
}




Monitors (CLR locks)

9 07 2011

The CLR provides “monitors” as the managed code equivalent to critical regions and Win32’s critical sections. You call Monitor.Enter on an object to acquire the monitor, if another thread already holds that monitor the calling thread will block until the owning thread releases it. The owning thread releases the monitor by calling Monitor.Exit.

Don’t use ‘this’ as your object to lock on. It is better practice to explicitly manage and wall off your synchronization objects from the rest of the world. Using ‘this’ can make encapsulation difficult by exposing the implementation detail of your locking mechanism to clients of your object. In order to ensure that a thread always leaves the monitor you use a try / finally block so that the monitor is released even in the face of an exception.

Monitors are different from mutexes. A mutex works across multiple processes while monitors are for a single process. Monitors are a lot faster (~ 50 times faster) to acquire than mutexes because they are acquired in user mode, not kernel mode.

object monitorObj = new object();

Monitor.Enter(monitorObj);
try
{

}
finally
{
    Monitor.Exit(monitorObj);
}

This pattern is so common that C# provides syntactic sugar for it in the form of the lock statement.

object monitorObj = new object();

lock(monitorObj)
{
}

The same IL is emitted by the C# compiler in both of the cases above.