Commit 65145e6

mo khan <mo@mokhan.ca>
2015-02-07 19:33:06
use a single topic exchange instead of multiple fanout exchanges.
1 parent 2bbe409
app/controllers/agents/events_controller.rb
@@ -11,7 +11,9 @@ module Agents
     end
 
     def create
-      Publisher.publish("events", event_params.merge({agent_id: @agent.id}))
+      message = event_params.merge({agent_id: @agent.id})
+      routing_key = "events.#{event_params[:name]}.#{@agent.id}"
+      Publisher.publish(routing_key, message)
       redirect_to agent_events_url, notice: 'Event was successfully created.'
     end
 
app/controllers/agents/files_controller.rb
@@ -16,12 +16,19 @@ module Agents
         name: params[:name],
         data: params[:data]
       })
+
+      message = {
+        agent_id: params[:id],
+        name: params[:name],
+        data: params[:data]
+      }
+      Publisher.publish("events.scanned.#{@agent.id}", message)
     end
 
     private
 
     def load_agent
-      Agent.find(params[:agent_id])
+      @agent = Agent.find(params[:agent_id])
     end
   end
 end
app/controllers/dispositions_controller.rb
@@ -26,7 +26,8 @@ class DispositionsController < ApplicationController
   # POST /dispositions
   # POST /dispositions.json
   def create
-    Publisher.publish("poke", disposition_params)
+    fingerprint = disposition_params[:fingerprint]
+    Publisher.publish("commands.poke.#{fingerprint}", disposition_params)
 
     respond_to do |format|
       format.html { redirect_to dispositions_path, notice: 'Disposition was successfully created.' }
app/jobs/fingerprint_lookup_job.rb
@@ -9,7 +9,6 @@ class FingerprintLookupJob < ActiveJob::Base
       apiKey: ENV.fetch("VIRUS_TOTAL_API_KEY"),
     })
     report = JSON.parse(response.response_body)
-    puts report.inspect
     disposition = Disposition.find_by(fingerprint: fingerprint)
     disposition.file_reports.create!(data: report)
   end
app/services/publisher.rb
@@ -1,7 +1,7 @@
 class Publisher
-  def self.publish(exchange, message = {})
-    exchange = channel.fanout("malwer.#{exchange}")
-    exchange.publish(message.to_json)
+  def self.publish(routing_key, message = {})
+    exchange = channel.topic("malwer")
+    exchange.publish(message.to_json, routing_key: routing_key)
   end
 
   def self.channel
app/workers/cloud_queries.rb
@@ -11,21 +11,8 @@ class CloudQueries
     fingerprint = attributes["data"]["fingerprint"]
     disposition = Disposition.find_by(fingerprint: fingerprint)
 
-    publish(JSON.generate({
-      name: :scanned,
-      agent_id: attributes["agent_id"],
-      data: attributes["data"]
-    }), to_queue: "worker.events")
-
-    if disposition.nil?
-      #publish(JSON.generate({
-      #command: :request_analysis,
-      #agent_id: attributes["agent_id"],
-      #fingerprint: fingerprint,
-      #}), routing_key: "malwer.commands")
-      Disposition.create!(fingerprint: fingerprint, state: :unknown)
-      FingerprintLookupJob.perform_later(fingerprint)
-    end
+    Disposition.create!(fingerprint: fingerprint, state: :unknown) if disposition.nil?
+    FingerprintLookupJob.perform_later(fingerprint) if disposition.state == :unknown
 
     ack!
   end
lib/tasks/rabbitmq.rake
@@ -6,20 +6,26 @@ namespace :rabbitmq do
     connection.start
     channel = connection.create_channel
 
-    # event intake bindings
-    exchange = channel.fanout("malwer.events")
-    queue = channel.queue("worker.events", durable: true)
-    queue.bind("malwer.events")
+    # single malwer topic exchange
+    # routing keys:
+    # * commands.command_type.(agent_id/fingerprint)
+      # * commands can be issued for specific agents
+      # * commands can be issued globally. (e.g. poke a dispostion)
+    # * events.event_type.agent_id
 
-    # poke bindings
-    exchange = channel.fanout("malwer.poke")
-    queue = channel.queue("worker.poke", durable: true)
-    queue.bind("malwer.poke")
+    channel.topic("malwer").tap do |exchange|
+      # event intake bindings
+      queue = channel.queue("worker.events", durable: true)
+      queue.bind(exchange, routing_key: "events.#")
 
-    # cloud queries bindings
-    exchange = channel.fanout("malwer.queries")
-    queue = channel.queue("worker.queries", durable: true)
-    queue.bind("malwer.queries")
+      # poke bindings
+      queue = channel.queue("worker.poke", durable: true)
+      queue.bind(exchange, routing_key: "commands.poke.#")
+
+      # cloud queries bindings
+      queue = channel.queue("worker.queries", durable: true)
+      queue.bind(exchange, routing_key: 'events.scanned.#')
+    end
 
     connection.close
   end