Commit 4b2ba4d

mo khan <mo@mokhan.ca>
2010-04-26 03:29:16
trying to figure out why the message cannot be processed... the transaction is commited but it still blows up in rhino queues for some weird reason.
1 parent 7e72e49
product/client/presentation.windows/bootstrappers/Bootstrapper.cs
@@ -33,9 +33,9 @@ namespace presentation.windows.bootstrappers
             builder.Register(x => shell_window).As<RegionManager>().SingletonScoped();
 
             //needs startups
+            builder.Register<StartServiceBus>().As<NeedStartup>();
             builder.Register<ComposeShell>().As<NeedStartup>();
             builder.Register<ConfigureMappings>().As<NeedStartup>();
-            builder.Register<StartServiceBus>().As<NeedStartup>();
 
             // infrastructure
             builder.Register<Log4NetLogFactory>().As<LogFactory>().SingletonScoped();
@@ -44,7 +44,7 @@ namespace presentation.windows.bootstrappers
             var manager = new QueueManager(new IPEndPoint(IPAddress.Loopback, 2201), "client.esent");
             manager.CreateQueues("client");
             builder.Register(x => new RhinoPublisher("server", 2200, manager)).As<ServiceBus>().SingletonScoped();
-            builder.Register(x => new RhinoReceiver(manager.GetQueue("client"))).As<RhinoReceiver>().As<Receiver>().SingletonScoped();
+            builder.Register(x => new RhinoReceiver(manager.GetQueue("client"), x.Resolve<CommandProcessor>())).As<RhinoReceiver>().As<Receiver>().SingletonScoped();
 
             // presentation infrastructure
             SynchronizationContext.SetSynchronizationContext(new DispatcherSynchronizationContext());
product/client/presentation.windows/bootstrappers/StartServiceBus.cs
@@ -1,5 +1,5 @@
-using System.Threading;
 using Gorilla.Commons.Infrastructure.Container;
+using MoMoney.Service.Infrastructure.Threading;
 using presentation.windows.common;
 using presentation.windows.common.messages;
 
@@ -16,12 +16,8 @@ namespace presentation.windows.bootstrappers
                 // synchronize with ui thread?
                 handler.handler(x);
             });
-            ThreadPool.QueueUserWorkItem(x =>
-            {
-                receiver.run();
-            });
-
-            Resolve.the<ServiceBus>().publish<StartedApplication>(x => x.message = "client");
+            Resolve.the<CommandProcessor>().add(receiver);
+            //Resolve.the<ServiceBus>().publish<StartedApplication>(x => x.message = "client");
         }
     }
 }
\ No newline at end of file
product/presentation.windows.common/messages/StartedApplication.cs
@@ -9,5 +9,10 @@ namespace presentation.windows.common.messages
     {
         [ProtoMember(1)]
         public string message { get; set; }
+
+        public override string ToString()
+        {
+            return base.ToString() + message;
+        }
     }
 }
\ No newline at end of file
product/presentation.windows.common/AbstractHandler.cs
@@ -1,17 +1,23 @@
 using System;
+using Gorilla.Commons.Infrastructure.Logging;
 
 namespace presentation.windows.common
 {
     public abstract class AbstractHandler<T> : Handler<T>, Handler
     {
-        public bool can_handle(Type type)
+        bool can_handle(Type type)
         {
+            this.log().debug("{0} can handle {1} = {2}", this, type, typeof (T).Equals(type));
             return typeof (T).Equals(type);
         }
 
         public void handle(object item)
         {
-            handle((T) item);
+            if (can_handle(item.GetType()))
+            {
+                this.log().debug("handling... {0}", item);
+                handle((T) item);
+            }
         }
 
         public abstract void handle(T item);
product/presentation.windows.common/Handler.cs
@@ -1,10 +1,7 @@
-using System;
-
 namespace presentation.windows.common
 {
     public interface Handler
     {
-        bool can_handle(Type type);
         void handle(object item);
     }
 
product/presentation.windows.common/MessageHandler.cs
@@ -1,8 +1,10 @@
+using System;
 using System.IO;
 using System.Runtime.Serialization.Formatters.Binary;
 using Gorilla.Commons.Infrastructure.Container;
 using Gorilla.Commons.Infrastructure.Logging;
 using gorilla.commons.utility;
+using ProtoBuf;
 using Rhino.Queues.Model;
 
 namespace presentation.windows.common
@@ -18,12 +20,20 @@ namespace presentation.windows.common
         }
 
         public void handler(Message item)
+        {
+            var payload = parse_payload_from(item);
+            this.log().debug("received: {0}", payload);
+            registry
+                .get_all<Handler>()
+                .each(x => x.handle(payload));
+        }
+
+        object parse_payload_from(Message item)
         {
             using (var stream = new MemoryStream(item.Data))
             {
-                var payload = formatter.Deserialize(stream);
-                registry.get_all<Handler>().where(x => x.can_handle(payload.GetType())).each(x => x.handle(payload));
-                this.log().debug("received: {0}", payload);
+                //return formatter.Deserialize(stream);
+                return Serializer.NonGeneric.Deserialize(Type.GetType(item.Headers["type"]), stream);
             }
         }
     }
product/presentation.windows.common/Receiver.cs
@@ -2,8 +2,5 @@ using gorilla.commons.utility;
 
 namespace presentation.windows.common
 {
-    public interface Receiver : Command
-    {
-        void stop();
-    }
+    public interface Receiver : Command {}
 }
\ No newline at end of file
product/presentation.windows.common/RhinoPublisher.cs
@@ -2,6 +2,7 @@ using System;
 using System.IO;
 using System.Runtime.Serialization.Formatters.Binary;
 using System.Transactions;
+using Gorilla.Commons.Infrastructure.Logging;
 using gorilla.commons.utility;
 using ProtoBuf;
 using Rhino.Queues;
@@ -29,17 +30,26 @@ namespace presentation.windows.common
 
         public void publish<T>(T item) where T : new()
         {
-            using (var tx = new TransactionScope())
+            using (var transaction = new TransactionScope())
             {
-                using (var stream = new MemoryStream())
-                {
-                    //Serializer.Serialize(stream, item);
-                    formatter.Serialize(stream, item);
-                    sender.Send(new Uri("rhino.queues://localhost:{0}/{1}".formatted_using(port, destination_queue)), new MessagePayload {Data = stream.ToArray()});
-                }
-                tx.Complete();
+                var destination = "rhino.queues://localhost:{0}/{1}".formatted_using(port, destination_queue);
+                this.log().debug("sending {0} to {1}", item, destination);
+                sender.Send(new Uri(destination), create_payload_from(item));
+                transaction.Complete();
+            }
+        }
+
+        MessagePayload create_payload_from<T>(T item)
+        {
+            using (var stream = new MemoryStream())
+            {
+                Serializer.Serialize(stream, item);
+                //formatter.Serialize(stream, item);
+
+                var payload = new MessagePayload {Data = stream.ToArray()};
+                payload.Headers["type"] = typeof (T).FullName;
+                return payload;
             }
-            //sender.WaitForAllMessagesToBeSent();
         }
 
         public void publish<T>(Action<T> configure) where T : new()
product/presentation.windows.common/RhinoReceiver.cs
@@ -1,6 +1,9 @@
+using System;
 using System.Collections.Generic;
 using System.Transactions;
+using Gorilla.Commons.Infrastructure.Logging;
 using gorilla.commons.utility;
+using MoMoney.Service.Infrastructure.Threading;
 using Rhino.Queues;
 using Rhino.Queues.Model;
 
@@ -8,13 +11,14 @@ namespace presentation.windows.common
 {
     public class RhinoReceiver : Receiver
     {
-        bool running = true;
         List<Observer<Message>> observers = new List<Observer<Message>>();
         IQueue queue;
+        CommandProcessor processor;
 
-        public RhinoReceiver(IQueue queue)
+        public RhinoReceiver(IQueue queue, CommandProcessor processor)
         {
             this.queue = queue;
+            this.processor = processor;
         }
 
         public void register(Observer<Message> observer)
@@ -24,21 +28,23 @@ namespace presentation.windows.common
 
         public void run()
         {
-            running = true;
-            while (running)
+            try
             {
                 using (var transaction = new TransactionScope())
                 {
                     var message = queue.Receive();
-                    observers.each(x => x(message));
+                    observers.each(observer => observer(message));
                     transaction.Complete();
                 }
             }
-        }
-
-        public void stop()
-        {
-            running = false;
+            catch (Exception e)
+            {
+                e.add_to_log();
+            }
+            finally
+            {
+                processor.add(this);
+            }
         }
     }
 }
\ No newline at end of file
product/presentation.windows.common/ServiceBus.cs
@@ -4,8 +4,8 @@ namespace presentation.windows.common
 {
     public interface ServiceBus
     {
-        void publish<T>() where T : new();
-        void publish<T>(T item) where T : new();
-        void publish<T>(Action<T> configure) where T : new();
+        void publish<Message>() where Message : new();
+        void publish<Message>(Message item) where Message : new();
+        void publish<Message>(Action<Message> configure) where Message : new();
     }
 }
\ No newline at end of file
product/presentation.windows.server/Bootstrapper.cs
@@ -38,8 +38,8 @@ namespace presentation.windows.server
 
             builder.Register(x => registry).As<DependencyRegistry>().SingletonScoped();
             //needs startups
-            builder.Register<ConfigureMappings>().As<NeedStartup>();
             builder.Register<StartServiceBus>().As<NeedStartup>();
+            builder.Register<ConfigureMappings>().As<NeedStartup>();
 
             // infrastructure
             builder.Register<Log4NetLogFactory>().As<LogFactory>().SingletonScoped();
@@ -48,7 +48,7 @@ namespace presentation.windows.server
             var manager = new QueueManager(new IPEndPoint(IPAddress.Loopback, 2200), "server.esent");
             manager.CreateQueues("server");
             builder.Register(x => new RhinoPublisher("client", 2201, manager)).As<ServiceBus>().SingletonScoped();
-            builder.Register(x => new RhinoReceiver(manager.GetQueue("server"))).As<RhinoReceiver>().As<Receiver>().SingletonScoped();
+            builder.Register(x => new RhinoReceiver(manager.GetQueue("server"), x.Resolve<CommandProcessor>())).As<RhinoReceiver>().As<Receiver>().SingletonScoped();
 
             var session_factory = bootstrap_nhibernate();
             builder.Register<ISessionFactory>(x => session_factory).SingletonScoped();
product/presentation.windows.server/Program.cs
@@ -1,7 +1,8 @@
 using System;
 using Gorilla.Commons.Infrastructure.Container;
 using Gorilla.Commons.Infrastructure.Logging;
-using presentation.windows.common;
+using MoMoney.Service.Infrastructure.Threading;
+using Rhino.Queues;
 
 namespace presentation.windows.server
 {
@@ -15,9 +16,15 @@ namespace presentation.windows.server
                 {
                     (e.ExceptionObject as Exception).add_to_log();
                 };
+                AppDomain.CurrentDomain.ProcessExit += (o, e) =>
+                {
+                    "shutting down".log();
+                    Resolve.the<CommandProcessor>().stop();
+                    Resolve.the<IQueueManager>().Dispose();
+                    Environment.Exit(Environment.ExitCode);
+                };
                 Bootstrapper.run();
                 Console.ReadLine();
-                Resolve.the<Receiver>().stop();
             }
             catch (Exception e)
             {
product/presentation.windows.server/StartServiceBus.cs
@@ -1,5 +1,6 @@
 using System.Threading;
 using Gorilla.Commons.Infrastructure.Container;
+using MoMoney.Service.Infrastructure.Threading;
 using momoney.service.infrastructure.transactions;
 using presentation.windows.common;
 using presentation.windows.common.messages;
@@ -20,7 +21,8 @@ namespace presentation.windows.server
                     unit_of_work.commit();
                 }
             });
-            ThreadPool.QueueUserWorkItem(x => receiver.run());
+            Resolve.the<CommandProcessor>().add(receiver);
+            //ThreadPool.QueueUserWorkItem(x => receiver.run());
             Resolve.the<ServiceBus>().publish<StartedApplication>(x => x.message = "server");
         }
     }
thirdparty/rhino.queues/Rhino.Queues.dll
Binary file