Sunday, 11 August 2013

Rabbitmq implementation in rails using amqp gem


RabbitMQ is a message broker.It accepts messages from producers, and delivers them to consumers. In-between, it can route, buffer, and persist the messages according to rules we give it.
We need Ruby's amqp gem to implement it in rails.

For installation of rabbitmq and amqp gem check out the github link of amqp gem

After installation,

You need to create a ruby file in initializers and paste this code snippet:

require "amqp"

error_handler = Proc.new do |settings|
  puts "!!! Failed to connect..."
  EM.stop
end
puts "Creating a subscriber"
 Thread.new {
  EventMachine.run do
    $connection = AMQP.connect(:host => '127.0.0.1')
    puts "Initialize Subscriber..."
    $channel = AMQP::Channel.new($connection)
    $queue = $channel.queue("amqpgem.examples.helloworld", :auto_delete => true)
    $exchange = $channel.direct("")
    $queue.subscribe do |payload|
        puts "Received a message: #{payload}..."
        sleep(10)
        puts "After sleep"
    end
  end 
}
class MessageQueue

  def push(message)
    puts "Got a message for pushing"+message
    EventMachine.next_tick do
      #puts "Published to the routing key "+$exchange.to_yaml
      $exchange.publish(message, routing_key: "amqpgem.examples.helloworld")
    end
  end
end

MESSAGE_QUEUE = MessageQueue.new

Now In order to push a message into the queue call this in the controller:

MESSAGE_QUEUE.push("Pushing my first message")


And now whenever you call the controller,  you will get a message into the console "After sleep" after around 10 seconds.

Here is Little explanation of the code:

Whenever we start our server, our ruby file gets loaded which in turn creates a thread and start EventMachine into it.

EventMachine is an event-driven I/O and lightweight concurrency library for Ruby. It provides event-driven I/O using the Reactor pattern, much like JBoss, Netty, Apache MINA, Node.js.

 After EventMachine has been started we start a connection to AMQP broker(Rabbitmq). Then we create a channel on the connection and a queue on that channel.And then we are starting our subscriber and bind it to the queue. It will be responsible for taking the message and other information from the queue and process  accordingly as instructed.

Now, whenever  we call the controller it calls
MESSAGE_QUEUE.push and passes a message to the push method which in turn pushes  it to the queue. Then there,it will be taken by the subscriber and it will do rest of the work.

For detailed explanation  checkout the github link of amqp gem