main
 1using System;
 2using System.Collections.Generic;
 3using System.Linq.Expressions;
 4using System.Threading;
 5using gorilla.commons.utility;
 6
 7namespace MoMoney.Service.Infrastructure.Threading
 8{
 9    public class AsynchronousCommandProcessor : CommandProcessor
10    {
11        readonly Queue<Command> queued_commands;
12        readonly EventWaitHandle manual_reset;
13        readonly IList<Thread> worker_threads;
14        bool keep_working;
15
16        public AsynchronousCommandProcessor()
17        {
18            queued_commands = new Queue<Command>();
19            worker_threads = new List<Thread>();
20            manual_reset = new ManualResetEvent(false);
21        }
22
23        public void add(Expression<Action> action_to_process)
24        {
25            add(new AnonymousCommand(action_to_process));
26        }
27
28        public void add(Command command_to_process)
29        {
30            lock (queued_commands)
31            {
32                if (queued_commands.Contains(command_to_process)) return;
33                queued_commands.Enqueue(command_to_process);
34                reset_thread();
35            }
36        }
37
38        public void run()
39        {
40            reset_thread();
41            keep_working = true;
42            var worker_thread = new Thread(run_commands);
43            worker_thread.SetApartmentState(ApartmentState.STA);
44            worker_threads.Add(worker_thread);
45            worker_thread.Start();
46        }
47
48        public void stop()
49        {
50            keep_working = false;
51            manual_reset.Set();
52            //manual_reset.Close();
53        }
54
55        [STAThread]
56        void run_commands()
57        {
58            while (keep_working)
59            {
60                manual_reset.WaitOne();
61                run_next_command();
62            }
63        }
64
65        void run_next_command()
66        {
67            Command command;
68            lock (queued_commands)
69            {
70                if (queued_commands.Count == 0)
71                {
72                    manual_reset.Reset();
73                    return;
74                }
75                command = queued_commands.Dequeue();
76            }
77            command.run();
78            reset_thread();
79        }
80
81        void reset_thread()
82        {
83            lock (queued_commands)
84            {
85                if (queued_commands.Count > 0) manual_reset.Set();
86                else manual_reset.Reset();
87            }
88        }
89    }
90}