Commit 6676a68

mo khan <mo@mokhan.ca>
2010-07-09 00:32:23
replaced async command processor with synchronous one.
1 parent 95a62bd
code/client/Client.cs
@@ -62,8 +62,8 @@ namespace client
 
 
             // commanding
-            builder.Register<AsynchronousCommandProcessor>().As<CommandProcessor>().SingletonScoped();
-
+            //builder.Register<AsynchronousCommandProcessor>().As<CommandProcessor>().SingletonScoped();
+            builder.Register<SynchronousCommandProcessor>().As<CommandProcessor>().SingletonScoped();
 
             builder.Register<RequestHandler>().As<Handler>();
 
code/common/AsynchronousCommandProcessor.cs
@@ -1,106 +1,106 @@
-using System;
-using System.Collections.Generic;
-using System.Threading;
-
-namespace common
-{
-    public class AsynchronousCommandProcessor : CommandProcessor
-    {
-        readonly Queue<Command> queued_commands;
-        readonly EventWaitHandle manual_reset;
-        readonly IList<Thread> worker_threads;
-        bool keep_working;
-
-        static readonly Command Empty = new EmptyCommand();
-
-        public AsynchronousCommandProcessor()
-        {
-            queued_commands = new Queue<Command>();
-            worker_threads = new List<Thread>();
-            manual_reset = new ManualResetEvent(false);
-        }
-
-        public void add(Command command_to_process)
-        {
-            lock (queued_commands)
-            {
-                if (queued_commands.Contains(command_to_process)) return;
-                queued_commands.Enqueue(command_to_process);
-                reset_thread();
-            }
-        }
-
-        public void run()
-        {
-            reset_thread();
-            keep_working = true;
-            var worker_thread = new Thread(run_commands);
-            worker_thread.SetApartmentState(ApartmentState.STA);
-            worker_threads.Add(worker_thread);
-            worker_thread.Start();
-        }
-
-        public void stop()
-        {
-            keep_working = false;
-            manual_reset.Set();
-            //manual_reset.Close();
-        }
-
-        [STAThread]
-        void run_commands()
-        {
-            while (keep_working)
-            {
-                manual_reset.WaitOne();
-                run_next_command();
-            }
-        }
-
-        void run_next_command()
-        {
-            var command = Empty;
-            within_lock(() =>
-            {
-                if (queued_commands.Count == 0)
-                    manual_reset.Reset();
-                else
-                    command = queued_commands.Dequeue();
-            });
-            safely_invoke(() =>
-            {
-                command.run();
-            });
-            reset_thread();
-        }
-
-        void safely_invoke(Action action)
-        {
-            try
-            {
-                action();
-            }
-            catch (Exception e)
-            {
-                e.add_to_log();
-            }
-        }
-
-        void reset_thread()
-        {
-            within_lock(() =>
-            {
-                if (queued_commands.Count > 0) manual_reset.Set();
-                else manual_reset.Reset();
-            });
-        }
-
-        void within_lock(Action action)
-        {
-            lock (queued_commands)
-            {
-                action();
-            }
-        }
-    }
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace common
+{
+    public class AsynchronousCommandProcessor : CommandProcessor
+    {
+        readonly Queue<Command> queued_commands;
+        readonly EventWaitHandle manual_reset;
+        readonly IList<Thread> worker_threads;
+        bool keep_working;
+
+        static readonly Command Empty = new EmptyCommand();
+
+        public AsynchronousCommandProcessor()
+        {
+            queued_commands = new Queue<Command>();
+            worker_threads = new List<Thread>();
+            manual_reset = new ManualResetEvent(false);
+        }
+
+        public void add(Command command_to_process)
+        {
+            lock (queued_commands)
+            {
+                if (queued_commands.Contains(command_to_process)) return;
+                queued_commands.Enqueue(command_to_process);
+                reset_thread();
+            }
+        }
+
+        public void run()
+        {
+            reset_thread();
+            keep_working = true;
+            var worker_thread = new Thread(run_commands);
+            worker_thread.SetApartmentState(ApartmentState.STA);
+            worker_threads.Add(worker_thread);
+            worker_thread.Start();
+        }
+
+        public void stop()
+        {
+            keep_working = false;
+            manual_reset.Set();
+            //manual_reset.Close();
+        }
+
+        [STAThread]
+        void run_commands()
+        {
+            while (keep_working)
+            {
+                manual_reset.WaitOne();
+                run_next_command();
+            }
+        }
+
+        void run_next_command()
+        {
+            var command = Empty;
+            within_lock(() =>
+            {
+                if (queued_commands.Count == 0)
+                    manual_reset.Reset();
+                else
+                    command = queued_commands.Dequeue();
+            });
+            safely_invoke(() =>
+            {
+                command.run();
+            });
+            reset_thread();
+        }
+
+        void safely_invoke(Action action)
+        {
+            try
+            {
+                action();
+            }
+            catch (Exception e)
+            {
+                e.add_to_log();
+            }
+        }
+
+        void reset_thread()
+        {
+            within_lock(() =>
+            {
+                if (queued_commands.Count > 0) manual_reset.Set();
+                else manual_reset.Reset();
+            });
+        }
+
+        void within_lock(Action action)
+        {
+            lock (queued_commands)
+            {
+                action();
+            }
+        }
+    }
 }
\ No newline at end of file
code/common/CommandProcessor.cs
@@ -1,8 +1,8 @@
-namespace common
-{
-    public interface CommandProcessor : Command
-    {
-        void add(Command command);
-        void stop();
-    }
+namespace common
+{
+    public interface CommandProcessor : Command
+    {
+        void add(Command command_to_process);
+        void stop();
+    }
 }
\ No newline at end of file
code/common/common.csproj
@@ -74,6 +74,7 @@
     <Compile Include="ServiceBus.cs" />
     <Compile Include="RequestHandler.cs" />
     <Compile Include="StringFormatting.cs" />
+    <Compile Include="SynchronousCommandProcessor.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
code/common/Logging.cs
@@ -1,5 +1,4 @@
 using System;
-using System.Reflection;
 
 namespace common
 {
@@ -7,7 +6,8 @@ namespace common
     {
         static public void log(this string item, params object[] arguments)
         {
-            Console.Out.WriteLine("{0}: {1}".format(Assembly.GetEntryAssembly().GetName().Name, item.format(arguments)));
+            //Console.Out.WriteLine("{0}: {1}".format(Assembly.GetEntryAssembly().GetName().Name, item.format(arguments)));
+            Console.Out.WriteLine(item.format(arguments));
         }
 
         static public void add_to_log(this Exception item)
code/common/RequestHandler.cs
@@ -16,8 +16,10 @@ namespace common
 
         public override void handle(Message item)
         {
+            //Console.Clear();
             "received {0} from {1} {2}".log(item.message, item.source, DateTime.Now);
-            Thread.Sleep(5000);
+            //Thread.Sleep(5000);
+            //Console.In.ReadLine();
             var source = Assembly.GetEntryAssembly().GetName().Name;
             "sending  {0} from {1} {2}".log(item.message.Equals("ping") ? "pong" : "ping", source, DateTime.Now);
             bus.publish<Message>(x =>
code/common/SynchronousCommandProcessor.cs
@@ -0,0 +1,29 @@
+using System.Collections.Generic;
+
+namespace common
+{
+    public class SynchronousCommandProcessor : CommandProcessor
+    {
+        readonly Queue<Command> queued_commands;
+
+        public SynchronousCommandProcessor()
+        {
+            queued_commands = new Queue<Command>();
+        }
+
+        public void add(Command command_to_process)
+        {
+            queued_commands.Enqueue(command_to_process);
+        }
+
+        public void run()
+        {
+            while (queued_commands.Count > 0) queued_commands.Dequeue().run();
+        }
+
+        public void stop()
+        {
+            queued_commands.Clear();
+        }
+    }
+}
\ No newline at end of file
code/server/Server.cs
@@ -53,7 +53,8 @@ namespace server
             builder.Register(x => new RhinoPublisher("client", 2201, manager)).As<ServiceBus>().SingletonScoped();
             builder.Register(x => new RhinoReceiver(manager.GetQueue("server"), x.Resolve<CommandProcessor>())).As<RhinoReceiver>().As<Receiver>().SingletonScoped();
 
-            builder.Register<AsynchronousCommandProcessor>().As<CommandProcessor>().SingletonScoped();
+            //builder.Register<AsynchronousCommandProcessor>().As<CommandProcessor>().SingletonScoped();
+            builder.Register<SynchronousCommandProcessor>().As<CommandProcessor>().SingletonScoped();
             builder.Register<RequestHandler>().As<Handler>();
 
             Resolve.the<IEnumerable<NeedStartup>>().each(x => x.run());