Commit bab5e66

mo khan <mo@mokhan.ca>
2015-02-04 04:10:08
poke disposition in worker.
1 parent 59d4503
app/controllers/dispositions_controller.rb
@@ -26,30 +26,21 @@ class DispositionsController < ApplicationController
   # POST /dispositions
   # POST /dispositions.json
   def create
-    @disposition = Disposition.new(disposition_params)
+    Publisher.publish("poke", disposition_params)
 
     respond_to do |format|
-      if @disposition.save
-        format.html { redirect_to @disposition, notice: 'Disposition was successfully created.' }
-        format.json { render :show, status: :created, location: @disposition }
-      else
-        format.html { render :new }
-        format.json { render json: @disposition.errors, status: :unprocessable_entity }
-      end
+      format.html { redirect_to dispositions_path, notice: 'Disposition was successfully created.' }
+      format.json { head :no_content }
     end
   end
 
   # PATCH/PUT /dispositions/1
   # PATCH/PUT /dispositions/1.json
   def update
+    Publisher.publish("poke", disposition_params)
     respond_to do |format|
-      if @disposition.update(disposition_params)
-        format.html { redirect_to @disposition, notice: 'Disposition was successfully updated.' }
-        format.json { render :show, status: :ok, location: @disposition }
-      else
-        format.html { render :edit }
-        format.json { render json: @disposition.errors, status: :unprocessable_entity }
-      end
+      format.html { redirect_to dispositions_path, notice: 'Disposition was successfully updated.' }
+      format.json { head :no_content }
     end
   end
 
app/models/disposition.rb
@@ -3,6 +3,7 @@ class Disposition < ActiveRecord::Base
   attr_readonly :fingerprint
 
   validates_uniqueness_of :fingerprint
+  validates_presence_of :fingerprint, :state
 
   def to_param
     fingerprint
app/workers/events_worker.rb
@@ -2,10 +2,7 @@ require 'json'
 
 class EventsWorker
   include Sneakers::Worker
-  from_queue "worker.events",
-    threads: 50,
-    prefetch: 50,
-    timeout_job_after: 1
+  from_queue "worker.events"
 
   def work(event_json)
     logger.info event_json
app/workers/poke.rb
@@ -0,0 +1,16 @@
+require 'json'
+
+class Poke
+  include Sneakers::Worker
+  from_queue "worker.poke"
+
+  def work(json)
+    attributes = JSON.parse(json)
+
+    disposition = Disposition.find_or_create_by(fingerprint: attributes["fingerprint"])
+    disposition.state = attributes["state"]
+    disposition.save!
+
+    ack!
+  end
+end
bin/setup
@@ -20,6 +20,9 @@ Dir.chdir APP_ROOT do
   puts "\n== Preparing database =="
   system "bin/rake db:setup"
 
+  puts "\n== Preparing RabbitMQ bindings =="
+  system "bin/rake rabbitmq:setup"
+
   puts "\n== Removing old logs and tempfiles =="
   system "rm -f log/*"
   system "rm -rf tmp/cache"
lib/tasks/rabbitmq.rake
@@ -6,13 +6,16 @@ namespace :rabbitmq do
     connection.start
     channel = connection.create_channel
 
-    # create exchange
+    # event intake bindings
     exchange = channel.fanout("malwer.events")
-
-    # get or create queue (note the durable setting)
     queue = channel.queue("worker.events", durable: true)
-    # bind queue to exchange
     queue.bind("malwer.events")
+
+    # poke bindings
+    exchange = channel.fanout("malwer.poke")
+    queue = channel.queue("worker.poke", durable: true)
+    queue.bind("malwer.poke")
+
     connection.close
   end
 end
Procfile
@@ -1,2 +1,3 @@
 web: rails s
-worker: env WORKERS=EventsWorker rake sneakers:run
+event_intake: env WORKERS=EventsWorker rake sneakers:run
+poke: env WORKERS=Poke rake sneakers:run