Commit b7a97ed

mo khan <mo@mokhan.ca>
2010-04-25 01:44:18
trying to get this rhino queues working with two queue managers per side.
1 parent 1269a21
product/client/presentation.windows/bootstrappers/Bootstrapper.cs
@@ -31,10 +31,13 @@ namespace presentation.windows.bootstrappers
             //needs startups
             builder.Register<ComposeShell>().As<NeedStartup>();
             builder.Register<ConfigureMappings>().As<NeedStartup>();
+            builder.Register<StartServiceBus>().As<NeedStartup>();
 
             // infrastructure
             builder.Register<Log4NetLogFactory>().As<LogFactory>().SingletonScoped();
             builder.Register<DefaultMapper>().As<Mapper>().SingletonScoped();
+            builder.Register(x => new RhinoPublisher(23456, "server", "client_sender.esent", 2201)).As<ServiceBus>().SingletonScoped();
+            builder.Register(x => new RhinoReceiver(23457, "client", "client_receiver.esent")).As<RhinoReceiver>().As<Receiver>().SingletonScoped();
 
             // presentation infrastructure
             SynchronizationContext.SetSynchronizationContext(new DispatcherSynchronizationContext());
product/client/presentation.windows/bootstrappers/StartServiceBus.cs
@@ -0,0 +1,19 @@
+using System;
+using Gorilla.Commons.Infrastructure.Container;
+using presentation.windows.common;
+using presentation.windows.common.messages;
+
+namespace presentation.windows.bootstrappers
+{
+    public class StartServiceBus : NeedStartup
+    {
+        public void run()
+        {
+            var receiver = Resolve.the<RhinoReceiver>();
+            receiver.register(x => Console.Out.WriteLine(x));
+            receiver.run();
+
+            //Resolve.the<ServiceBus>().publish<StartedApplication>(x => x.message = "client");
+        }
+    }
+}
\ No newline at end of file
product/client/presentation.windows/presentation.windows.csproj
@@ -42,6 +42,10 @@
       <SpecificVersion>False</SpecificVersion>
       <HintPath>..\..\..\build\lib\app\automapper\AutoMapper.dll</HintPath>
     </Reference>
+    <Reference Include="Rhino.Queues, Version=1.2.0.0, Culture=neutral, PublicKeyToken=0b3305902db7183f, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\..\thirdparty\rhino.queues\Rhino.Queues.dll</HintPath>
+    </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core">
       <RequiredTargetFramework>3.5</RequiredTargetFramework>
@@ -114,6 +118,7 @@
     <Compile Include="ApplicationController.cs" />
     <Compile Include="bootstrappers\Bootstrapper.cs" />
     <Compile Include="bootstrappers\ConfigureMappings.cs" />
+    <Compile Include="bootstrappers\StartServiceBus.cs" />
     <Compile Include="events\SelectedFamilyMember.cs" />
     <Compile Include="service\infrastructure\UpdateOnLongRunningProcess.cs" />
     <Compile Include="Dialog.cs" />
product/commons/utility/Observer.cs
@@ -0,0 +1,4 @@
+namespace gorilla.commons.utility
+{
+    public delegate void Observer<T>(T item);
+}
\ No newline at end of file
product/commons/utility/utility.csproj
@@ -74,6 +74,7 @@
     <Compile Include="Import.cs" />
     <Compile Include="Mapper.cs" />
     <Compile Include="Notification.cs" />
+    <Compile Include="Observer.cs" />
     <Compile Include="Parser.cs" />
     <Compile Include="Query.cs" />
     <Compile Include="Registry.cs" />
product/presentation.windows.common/messages/AddedNewFamilyMember.cs
@@ -3,6 +3,7 @@ using MoMoney.Service.Infrastructure.Eventing;
 
 namespace presentation.windows.common.messages
 {
+    [Serializable]
     public class AddedNewFamilyMember : IEvent
     {
         public Guid id { get; set; }
product/presentation.windows.common/messages/CreateNewAccount.cs
@@ -1,5 +1,8 @@
+using System;
+
 namespace presentation.windows.common.messages
 {
+    [Serializable]
     public class CreateNewAccount
     {
         public string account_name { get; set; }
product/presentation.windows.common/messages/FamilyMemberToAdd.cs
@@ -2,6 +2,7 @@ using System;
 
 namespace presentation.windows.common.messages
 {
+    [Serializable]
     public class FamilyMemberToAdd
     {
         public string first_name { get; set; }
product/presentation.windows.common/messages/FindAllFamily.cs
@@ -1,4 +1,7 @@
+using System;
+
 namespace presentation.windows.common.messages
 {
+    [Serializable]
     public class FindAllFamily {}
 }
\ No newline at end of file
product/presentation.windows.common/messages/NewAccountCreated.cs
@@ -1,5 +1,8 @@
+using System;
+
 namespace presentation.windows.common.messages
 {
+    [Serializable]
     public class NewAccountCreated
     {
         public string name { get; set; }
product/presentation.windows.common/messages/StartedApplication.cs
@@ -0,0 +1,10 @@
+using System;
+
+namespace presentation.windows.common.messages
+{
+    [Serializable]
+    public class StartedApplication
+    {
+        public string message { get; set; }
+    }
+}
\ No newline at end of file
product/presentation.windows.common/MsmqBus.cs
@@ -36,6 +36,6 @@ namespace presentation.windows.common
             publish(item);
         }
 
-        void process(Message message) {}
+        void process(Message message) { }
     }
 }
\ No newline at end of file
product/presentation.windows.common/presentation.windows.common.csproj
@@ -39,15 +39,24 @@
       <SpecificVersion>False</SpecificVersion>
       <HintPath>..\..\build\lib\app\db40\Db4objects.Db4o.dll</HintPath>
     </Reference>
+    <Reference Include="Esent.Interop, Version=1.0.0.0, Culture=neutral, PublicKeyToken=b93b4ad6c4b80595, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\thirdparty\rhino.queues\Esent.Interop.dll</HintPath>
+    </Reference>
     <Reference Include="FluentNHibernate, Version=1.0.0.0, Culture=neutral, PublicKeyToken=8aa435e3cb308880, processorArchitecture=MSIL">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>..\..\build\lib\app\nhibernate\FluentNHibernate.dll</HintPath>
     </Reference>
+    <Reference Include="Rhino.Queues, Version=1.2.0.0, Culture=neutral, PublicKeyToken=0b3305902db7183f, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\thirdparty\rhino.queues\Rhino.Queues.dll</HintPath>
+    </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core">
       <RequiredTargetFramework>3.5</RequiredTargetFramework>
     </Reference>
     <Reference Include="System.Messaging" />
+    <Reference Include="System.Transactions" />
     <Reference Include="System.Xml.Linq">
       <RequiredTargetFramework>3.5</RequiredTargetFramework>
     </Reference>
@@ -64,9 +73,13 @@
     <Compile Include="messages\FamilyMemberToAdd.cs" />
     <Compile Include="messages\FindAllFamily.cs" />
     <Compile Include="messages\NewAccountCreated.cs" />
+    <Compile Include="messages\StartedApplication.cs" />
     <Compile Include="MsmqBus.cs" />
     <Compile Include="NeedStartup.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Receiver.cs" />
+    <Compile Include="RhinoPublisher.cs" />
+    <Compile Include="RhinoReceiver.cs" />
     <Compile Include="ServiceBus.cs" />
   </ItemGroup>
   <ItemGroup>
product/presentation.windows.common/Receiver.cs
@@ -0,0 +1,9 @@
+using gorilla.commons.utility;
+
+namespace presentation.windows.common
+{
+    public interface Receiver : Command
+    {
+        void stop();
+    }
+}
\ No newline at end of file
product/presentation.windows.common/RhinoPublisher.cs
@@ -0,0 +1,54 @@
+using System;
+using System.IO;
+using System.Net;
+using System.Runtime.Serialization.Formatters.Binary;
+using System.Transactions;
+using gorilla.commons.utility;
+using Rhino.Queues;
+
+namespace presentation.windows.common
+{
+    public class RhinoPublisher : ServiceBus
+    {
+        BinaryFormatter formatter = new BinaryFormatter();
+        readonly int send_port;
+        string destination_queue;
+        QueueManager sender;
+
+        public RhinoPublisher(int listen_port, string destination_queue, string esent, int send_port)
+        {
+            this.send_port = send_port;
+            this.destination_queue = destination_queue;
+
+            if (Directory.Exists(esent)) Directory.Delete(esent, true);
+            sender = new QueueManager(new IPEndPoint(IPAddress.Loopback, listen_port), esent);
+        }
+
+        public void publish<T>() where T : new()
+        {
+            publish(new T());
+        }
+
+        public void publish<T>(T item) where T : new()
+        {
+            using (var tx = new TransactionScope())
+            {
+                var buffer = new byte[255];
+                using (var stream = new MemoryStream(buffer))
+                {
+                    formatter.Serialize(stream, item);
+                    sender.Send(new Uri("rhino.queues://localhost:{0}/{1}".formatted_using(send_port, destination_queue)), new MessagePayload {Data = buffer});
+                }
+                tx.Complete();
+            }
+            //sender.WaitForAllMessagesToBeSent();
+        }
+
+        public void publish<T>(Action<T> configure) where T : new()
+        {
+            var item = new T();
+            configure(item);
+            publish(item);
+        }
+    }
+}
\ No newline at end of file
product/presentation.windows.common/RhinoReceiver.cs
@@ -0,0 +1,52 @@
+using System.Collections.Generic;
+using System.IO;
+using System.Net;
+using System.Transactions;
+using gorilla.commons.utility;
+using Rhino.Queues;
+using Rhino.Queues.Model;
+
+namespace presentation.windows.common
+{
+    public class RhinoReceiver : Receiver
+    {
+        bool running = true;
+        List<Observer<Message>> observers = new List<Observer<Message>>();
+        string queue_name;
+        IQueue queue;
+
+        public RhinoReceiver(int port, string queue_name, string esent_name)
+        {
+            this.queue_name = queue_name;
+
+            if (Directory.Exists(esent_name)) Directory.Delete(esent_name, true);
+            var manager = new QueueManager(new IPEndPoint(IPAddress.Loopback, port), esent_name);
+            manager.CreateQueues(this.queue_name);
+            queue = manager.GetQueue(this.queue_name);
+        }
+
+        public void register(Observer<Message> observer)
+        {
+            observers.Add(observer);
+        }
+
+        public void run()
+        {
+            running = true;
+            while (running)
+            {
+                using (var transaction = new TransactionScope())
+                {
+                    var message = queue.Receive();
+                    observers.each(x => x(message));
+                    transaction.Complete();
+                }
+            }
+        }
+
+        public void stop()
+        {
+            running = false;
+        }
+    }
+}
\ No newline at end of file
product/presentation.windows.server/Bootstrapper.cs
@@ -38,6 +38,8 @@ namespace presentation.windows.server
             // infrastructure
             builder.Register<Log4NetLogFactory>().As<LogFactory>().SingletonScoped();
             builder.Register<DefaultMapper>().As<Mapper>().SingletonScoped();
+            builder.Register(x => new RhinoPublisher(23457, "client", "server_sender.esent", 23456)).As<ServiceBus>().SingletonScoped();
+            builder.Register(x => new RhinoReceiver(23456, "server", "server_receiver.esent")).As<RhinoReceiver>().As<Receiver>().SingletonScoped();
 
             var session_factory = bootstrap_nhibernate();
             builder.Register(x => session_factory).SingletonScoped();
product/presentation.windows.server/Program.cs
@@ -1,4 +1,6 @@
 using System;
+using Gorilla.Commons.Infrastructure.Container;
+using presentation.windows.common;
 
 namespace presentation.windows.server
 {
@@ -8,6 +10,7 @@ namespace presentation.windows.server
         {
             Bootstrapper.run();
             Console.ReadLine();
+            Resolve.the<Receiver>().stop();
         }
     }
 }
\ No newline at end of file
product/presentation.windows.server/StartServiceBus.cs
@@ -1,77 +1,18 @@
 using System;
-using System.Collections.Specialized;
-using System.IO;
-using System.Net;
-using System.Runtime.Serialization.Formatters.Binary;
-using System.Text;
-using System.Transactions;
-using Gorilla.Commons.Infrastructure.Logging;
+using Gorilla.Commons.Infrastructure.Container;
 using presentation.windows.common;
-using Rhino.Queues;
+using presentation.windows.common.messages;
 
 namespace presentation.windows.server
 {
     public class StartServiceBus : NeedStartup
     {
-        bool running = true;
-
         public void run()
         {
-            var manager = new QueueManager(new IPEndPoint(IPAddress.Loopback, 2200), "receiver.esent");
-            manager.CreateQueues("server");
-            var queue = manager.GetQueue("server");
-
-            while (running)
-            {
-                using (var transaction = new TransactionScope())
-                {
-                    var message = queue.Receive();
-                    this.log().debug(message.Headers["Source"]);
-                    this.log().debug(Encoding.Unicode.GetString(message.Data));
-                    Console.Out.WriteLine(message.Headers["Source"]);
-                    Console.Out.WriteLine(Encoding.Unicode.GetString(message.Data));
-                    transaction.Complete();
-                }
-            }
-        }
-    }
-
-    public class RhinoBus : ServiceBus
-    {
-        BinaryFormatter formatter = new BinaryFormatter();
-
-        public void publish<T>() where T : new()
-        {
-            publish(new T());
-        }
-
-        public void publish<T>(T item) where T : new()
-        {
-			using (var sender = new QueueManager(new IPEndPoint(IPAddress.Loopback, 4546), "sender.esent"))
-			{
-				using (var tx = new TransactionScope())
-				{
-                    var buffer = new byte[int.MaxValue];
-                    using (var stream = new MemoryStream(buffer))
-                    {
-                        formatter.Serialize(stream, item);
-                        sender.Send(new Uri("rhino.queues://localhost:4545/uno"), new MessagePayload
-                                           {
-                                               Headers = new NameValueCollection(),
-                                               Data = buffer
-                                           });
-                    }
-					tx.Complete();
-				}
-				sender.WaitForAllMessagesToBeSent();
-			}
-        }
-
-        public void publish<T>(Action<T> configure) where T : new()
-        {
-            var item = new T();
-            configure(item);
-            publish(item);
+            var receiver = Resolve.the<RhinoReceiver>();
+            receiver.register(x => Console.Out.WriteLine(x));
+            Resolve.the<ServiceBus>().publish<StartedApplication>(x => x.message = "server");
+            receiver.run();
         }
     }
 }
\ No newline at end of file