main
  1using System;
  2using System.Collections.Generic;
  3using System.Threading;
  4
  5namespace common
  6{
  7    public class AsynchronousCommandProcessor : CommandProcessor
  8    {
  9        readonly Queue<Command> queued_commands;
 10        readonly EventWaitHandle manual_reset;
 11        readonly IList<Thread> worker_threads;
 12        bool keep_working;
 13
 14        static readonly Command Empty = new EmptyCommand();
 15
 16        public AsynchronousCommandProcessor()
 17        {
 18            queued_commands = new Queue<Command>();
 19            worker_threads = new List<Thread>();
 20            manual_reset = new ManualResetEvent(false);
 21        }
 22
 23        public void add(Command command_to_process)
 24        {
 25            lock (queued_commands)
 26            {
 27                if (queued_commands.Contains(command_to_process)) return;
 28                queued_commands.Enqueue(command_to_process);
 29                reset_thread();
 30            }
 31        }
 32
 33        public void run()
 34        {
 35            reset_thread();
 36            keep_working = true;
 37            var worker_thread = new Thread(run_commands);
 38            worker_thread.SetApartmentState(ApartmentState.STA);
 39            worker_threads.Add(worker_thread);
 40            worker_thread.Start();
 41        }
 42
 43        public void stop()
 44        {
 45            keep_working = false;
 46            manual_reset.Set();
 47            //manual_reset.Close();
 48        }
 49
 50        [STAThread]
 51        void run_commands()
 52        {
 53            while (keep_working)
 54            {
 55                manual_reset.WaitOne();
 56                run_next_command();
 57            }
 58        }
 59
 60        void run_next_command()
 61        {
 62            var command = Empty;
 63            within_lock(() =>
 64            {
 65                if (queued_commands.Count == 0)
 66                    manual_reset.Reset();
 67                else
 68                    command = queued_commands.Dequeue();
 69            });
 70            safely_invoke(() =>
 71            {
 72                command.run();
 73            });
 74            reset_thread();
 75        }
 76
 77        void safely_invoke(Action action)
 78        {
 79            try
 80            {
 81                action();
 82            }
 83            catch (Exception e)
 84            {
 85                e.add_to_log();
 86            }
 87        }
 88
 89        void reset_thread()
 90        {
 91            within_lock(() =>
 92            {
 93                if (queued_commands.Count > 0) manual_reset.Set();
 94                else manual_reset.Reset();
 95            });
 96        }
 97
 98        void within_lock(Action action)
 99        {
100            lock (queued_commands)
101            {
102                action();
103            }
104        }
105    }
106}