main
1using System;
2using System.Collections.Generic;
3using System.Threading;
4
5namespace common
6{
7 public class AsynchronousCommandProcessor : CommandProcessor
8 {
9 readonly Queue<Command> queued_commands;
10 readonly EventWaitHandle manual_reset;
11 readonly IList<Thread> worker_threads;
12 bool keep_working;
13
14 static readonly Command Empty = new EmptyCommand();
15
16 public AsynchronousCommandProcessor()
17 {
18 queued_commands = new Queue<Command>();
19 worker_threads = new List<Thread>();
20 manual_reset = new ManualResetEvent(false);
21 }
22
23 public void add(Command command_to_process)
24 {
25 lock (queued_commands)
26 {
27 if (queued_commands.Contains(command_to_process)) return;
28 queued_commands.Enqueue(command_to_process);
29 reset_thread();
30 }
31 }
32
33 public void run()
34 {
35 reset_thread();
36 keep_working = true;
37 var worker_thread = new Thread(run_commands);
38 worker_thread.SetApartmentState(ApartmentState.STA);
39 worker_threads.Add(worker_thread);
40 worker_thread.Start();
41 }
42
43 public void stop()
44 {
45 keep_working = false;
46 manual_reset.Set();
47 //manual_reset.Close();
48 }
49
50 [STAThread]
51 void run_commands()
52 {
53 while (keep_working)
54 {
55 manual_reset.WaitOne();
56 run_next_command();
57 }
58 }
59
60 void run_next_command()
61 {
62 var command = Empty;
63 within_lock(() =>
64 {
65 if (queued_commands.Count == 0)
66 manual_reset.Reset();
67 else
68 command = queued_commands.Dequeue();
69 });
70 safely_invoke(() =>
71 {
72 command.run();
73 });
74 reset_thread();
75 }
76
77 void safely_invoke(Action action)
78 {
79 try
80 {
81 action();
82 }
83 catch (Exception e)
84 {
85 e.add_to_log();
86 }
87 }
88
89 void reset_thread()
90 {
91 within_lock(() =>
92 {
93 if (queued_commands.Count > 0) manual_reset.Set();
94 else manual_reset.Reset();
95 });
96 }
97
98 void within_lock(Action action)
99 {
100 lock (queued_commands)
101 {
102 action();
103 }
104 }
105 }
106}