Correct organization of event processing in a separate thread

Posted on | December 26, 2011 | No Comments

Multithreaded application became something common. The main reason for that is execution of resource-intensive tasks in parallel to GUI without stopping it for long operations. As it is well known, the easiest way for asynchronous data processing used by ThreadPool. This method is not only the easiest, but also very efficient. Notwithstanding its merits, it has one large backdraw – an application cannot manage ThreadPool lifetime and cannot be confident that during termination ThreadPool doesn’t process previously assigned tasks that may cause application crash.

Let’s review a specific case of threadpool implementation consisting of a single thread. The requirements to this implementation are:

  • Delay between adding task to threadPool and initiation of its execution should be minimal
  • If there are no tasks, threadpool should not consume resources. Specifically, context switches should not be allocated to thread that doesn’t process tasks
  • Threadpool should provide an interface (IDisposable is the best for it) for correct work termination and guarantee that upon Dispose() call, all tasks including tasks in execution shall be completely terminated.
  • Tasks should be executed in sequence without acquiring synchronizing objects.
  • Enable recursive task execution
  • Providing interface for exception handling.

So, there are many requirements and all of those are sufficient. Monitor class is most convenient to implement thread synchronizations. It allows to protect task queue and to move thread to sleep state where it doesn’t consume resources and take it out of this state upon receiving a signal. On adding a new task to the queue and on thread termination request a signal should be sent to this thread. Another useful property of the Monitor is recursive and safe acquiring of the same synchronizing object enabling recursive task addition.

Now let’s review the process of thread pool termination. When a new thread is launched, it is transmitted a method called by the newly created thread. This method usually contains a task processing loop. During execution this cycle constantly checks value of a flag that specifies whether it is necessary to stop task processing. This flag is initially set to false, but Dispose function is set to true. When the calling thread sets this flag to true and notifies the threadpool about it, this thread should wait until threadPool execution ends using Join() method. If threadpool execution doesn’t end in allocated time, the calling thread may be forcibly terminated using Abort().

Exception handling. All task exceptions occur in threadPool thread. Therefore, if it they are not processed, the thread stops. To prevent it, the thread should catch all exceptions occurring during task execution. In a well designed system threadpool should provide interface for notification of extraordinary situation. UnhandledException event is most suitable for this purpose.
As new thread is not launched instantly, to prevent sending tasks to uninitialized thread, it is reasonable to add Monitor to thread start at threadPool.

Now we have just a few things to do – define the interfaces:

public interface ITask
{
    void Execute();
}

public interface IDispatcher
{
    void Dispatch(ITask task);
}

 

An example of ITask implementation enabling use of anonymous methods and lambda expressions for code reduction:

public class Task : ITask
{
    private readonly Action _action;

    public Task(Action action)
    {
        if (action == null)
        {
            throw new ArgumentNullException("action");
        }
        _action = action;
    }
    public void Execute()
    {
        _action();
    }
}

Now comes the hero: a code implementing threadpool based on a single thread:

internal sealed class SingleThreadDispatcher : IDispatcher, IDisposable
{
    private readonly Thread _thread;
    private bool _stopping;
    private readonly Queue _queue = new Queue();
    private readonly object _startSynchro = new object();

    public SingleThreadDispatcher()
    {
        lock (_startSynchro)
        {
            _thread = new Thread(Run) { IsBackground = true };
            _thread.Start();

            //White until the thread is started
            Monitor.Wait(_startSynchro);
        }
    }

    private void Run()
    {
        //Release the calling thread
        lock (_startSynchro)
        {
            Monitor.Pulse(_startSynchro);
        }

        while(true)
        {
            bool exit;
            ITask task = null;
            lock (_queue)
            {
                if (_queue.Count == 0)
                {
                    Monitor.Wait(_queue, 500);
                }
                else
                {
                    task = _queue.Dequeue();
                }

                exit = _stopping && _queue.Count == 0;
            }

            //Execute task without lock
            if (task != null)
            {
                try
                {
                    task.Execute();
                }
                catch (Exception e)
                {
                    if(UnhandledException != null)
                    {
                        UnhandledException(this, new UnhandledExceptionEventArgs(e, false));
                    }
                }
            }

            if(exit)
            {
                break;
            }
        }
    }

    public void Dispatch(ITask task)
    {
        lock (_queue)
        {
            _queue.Enqueue(task);
            Monitor.Pulse(_queue);
        }
    }

    public void Dispose()
    {
        lock (_queue)
        {
            _stopping = true;
            Monitor.Pulse(_queue);
        }

        //Join for a thread and if it doesn't exits, abort it.
        if(!_thread.Join(2000))
        {
            _thread.Abort();
        }
    }

    public event UnhandledExceptionEventHandler UnhandledException;
}

 

Example of initialization:

[STAThread]
static void Main()
{
    using (var t = new SingleThreadDispatcher())
    {
        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);
        Application.Run(new MyForm());
    }
}

 

Example of use:

t.Dispatch(new Task(()=>
{
    //Some code here
}));

 

Hope this will help you in your dev.
Kind regards,
Dapfor Team

Comments

Leave a Reply