main
  1using System;
  2using System.Collections.Generic;
  3using System.Threading;
  4using Gorilla.Commons.Infrastructure.Logging;
  5using gorilla.commons.utility;
  6
  7namespace gorilla.commons.infrastructure.threading
  8{
  9    public class AsynchronousCommandProcessor : CommandProcessor
 10    {
 11        readonly Queue<Command> queued_commands;
 12        readonly EventWaitHandle manual_reset;
 13        readonly IList<Thread> worker_threads;
 14        bool keep_working;
 15
 16        static public readonly Command Empty = new EmptyCommand();
 17
 18        public AsynchronousCommandProcessor()
 19        {
 20            queued_commands = new Queue<Command>();
 21            worker_threads = new List<Thread>();
 22            manual_reset = new ManualResetEvent(false);
 23        }
 24
 25        public void add(Action command)
 26        {
 27            add(new AnonymousCommand(command));
 28        }
 29
 30        public void add(Command command_to_process)
 31        {
 32            lock (queued_commands)
 33            {
 34                if (queued_commands.Contains(command_to_process)) return;
 35                queued_commands.Enqueue(command_to_process);
 36                reset_thread();
 37            }
 38        }
 39
 40        public void run()
 41        {
 42            reset_thread();
 43            keep_working = true;
 44            var worker_thread = new Thread(run_commands);
 45            worker_thread.SetApartmentState(ApartmentState.STA);
 46            worker_threads.Add(worker_thread);
 47            worker_thread.Start();
 48        }
 49
 50        public void stop()
 51        {
 52            keep_working = false;
 53            manual_reset.Set();
 54            //manual_reset.Close();
 55        }
 56
 57        [STAThread]
 58        void run_commands()
 59        {
 60            while (keep_working)
 61            {
 62                manual_reset.WaitOne();
 63                run_next_command();
 64            }
 65        }
 66
 67        void run_next_command()
 68        {
 69            var command = Empty;
 70            within_lock(() =>
 71                        {
 72                            if (queued_commands.Count == 0)
 73                                manual_reset.Reset();
 74                            else
 75                                command = queued_commands.Dequeue();
 76                        });
 77            safely_invoke(() =>
 78                          {
 79                              command.run();
 80                          });
 81            reset_thread();
 82        }
 83
 84        void safely_invoke(Action action)
 85        {
 86            try
 87            {
 88                action();
 89            }
 90            catch (Exception e)
 91            {
 92                this.log().error(e);
 93            }
 94        }
 95
 96        void reset_thread()
 97        {
 98            within_lock(() =>
 99                        {
100                            if (queued_commands.Count > 0) manual_reset.Set();
101                            else manual_reset.Reset();
102                        });
103        }
104
105        void within_lock(Action action)
106        {
107            lock (queued_commands)
108            {
109                action();
110            }
111        }
112    }
113}