main
  1using System;
  2using System.Collections.Generic;
  3using System.Threading;
  4using jive;
  5
  6namespace jive
  7{
  8  public class AsynchronousCommandProcessor : CommandProcessor
  9  {
 10    readonly Queue<Command> queued_commands;
 11    readonly EventWaitHandle manual_reset;
 12    readonly IList<Thread> worker_threads;
 13    bool keep_working;
 14
 15    static public readonly Command Empty = new EmptyCommand();
 16
 17    public AsynchronousCommandProcessor()
 18    {
 19      queued_commands = new Queue<Command>();
 20      worker_threads = new List<Thread>();
 21      manual_reset = new ManualResetEvent(false);
 22    }
 23
 24    public void add(Action command)
 25    {
 26      add(new AnonymousCommand(command));
 27    }
 28
 29    public void add(Command command_to_process)
 30    {
 31      lock (queued_commands)
 32      {
 33        if (queued_commands.Contains(command_to_process)) return;
 34        queued_commands.Enqueue(command_to_process);
 35        reset_thread();
 36      }
 37    }
 38
 39    public void run()
 40    {
 41      reset_thread();
 42      keep_working = true;
 43      var worker_thread = new Thread(run_commands);
 44      worker_thread.SetApartmentState(ApartmentState.STA);
 45      worker_threads.Add(worker_thread);
 46      worker_thread.Start();
 47    }
 48
 49    public void stop()
 50    {
 51      keep_working = false;
 52      manual_reset.Set();
 53      //manual_reset.Close();
 54    }
 55
 56    [STAThread]
 57    void run_commands()
 58    {
 59      while (keep_working)
 60      {
 61        manual_reset.WaitOne();
 62        run_next_command();
 63      }
 64    }
 65
 66    void run_next_command()
 67    {
 68      var command = Empty;
 69      within_lock(() =>
 70          {
 71          if (queued_commands.Count == 0)
 72          manual_reset.Reset();
 73          else
 74          command = queued_commands.Dequeue();
 75          });
 76      safely_invoke(() =>
 77          {
 78          command.run();
 79          });
 80      reset_thread();
 81    }
 82
 83    void safely_invoke(Action action)
 84    {
 85      try
 86      {
 87        action();
 88      }
 89      catch (Exception e)
 90      {
 91        this.log().error(e);
 92      }
 93    }
 94
 95    void reset_thread()
 96    {
 97      within_lock(() =>
 98          {
 99          if (queued_commands.Count > 0) manual_reset.Set();
100          else manual_reset.Reset();
101          });
102    }
103
104    void within_lock(Action action)
105    {
106      lock (queued_commands)
107      {
108        action();
109      }
110    }
111  }
112}