main
1using System;
2using System.Collections.Generic;
3using System.Threading;
4using Gorilla.Commons.Infrastructure.Logging;
5using gorilla.commons.utility;
6
7namespace gorilla.commons.infrastructure.threading
8{
9 public class AsynchronousCommandProcessor : CommandProcessor
10 {
11 readonly Queue<Command> queued_commands;
12 readonly EventWaitHandle manual_reset;
13 readonly IList<Thread> worker_threads;
14 bool keep_working;
15
16 static public readonly Command Empty = new EmptyCommand();
17
18 public AsynchronousCommandProcessor()
19 {
20 queued_commands = new Queue<Command>();
21 worker_threads = new List<Thread>();
22 manual_reset = new ManualResetEvent(false);
23 }
24
25 public void add(Action command)
26 {
27 add(new AnonymousCommand(command));
28 }
29
30 public void add(Command command_to_process)
31 {
32 lock (queued_commands)
33 {
34 if (queued_commands.Contains(command_to_process)) return;
35 queued_commands.Enqueue(command_to_process);
36 reset_thread();
37 }
38 }
39
40 public void run()
41 {
42 reset_thread();
43 keep_working = true;
44 var worker_thread = new Thread(run_commands);
45 worker_thread.SetApartmentState(ApartmentState.STA);
46 worker_threads.Add(worker_thread);
47 worker_thread.Start();
48 }
49
50 public void stop()
51 {
52 keep_working = false;
53 manual_reset.Set();
54 //manual_reset.Close();
55 }
56
57 [STAThread]
58 void run_commands()
59 {
60 while (keep_working)
61 {
62 manual_reset.WaitOne();
63 run_next_command();
64 }
65 }
66
67 void run_next_command()
68 {
69 var command = Empty;
70 within_lock(() =>
71 {
72 if (queued_commands.Count == 0)
73 manual_reset.Reset();
74 else
75 command = queued_commands.Dequeue();
76 });
77 safely_invoke(() =>
78 {
79 command.run();
80 });
81 reset_thread();
82 }
83
84 void safely_invoke(Action action)
85 {
86 try
87 {
88 action();
89 }
90 catch (Exception e)
91 {
92 this.log().error(e);
93 }
94 }
95
96 void reset_thread()
97 {
98 within_lock(() =>
99 {
100 if (queued_commands.Count > 0) manual_reset.Set();
101 else manual_reset.Reset();
102 });
103 }
104
105 void within_lock(Action action)
106 {
107 lock (queued_commands)
108 {
109 action();
110 }
111 }
112 }
113}