Friday, 10 December 2010

Activemq stomp temporary queue

Temporary queues are used to the message-reply pattern. Should a service require a response from another service, the caller should create a temporary queue, and set the JMS header JMSReplyTo to the name of the temporary queue.

The answerer sends back response to the address specified in JMSReplyTo which is a temporary queue which can be read only the process who created it and that will be destroyed as soon as it has been used.

There is everywhere instructions how to implement the request-response pattern with jms, here how to do it with stomp with apache activemq.

This is the ask.rb file: it was just a slight modified version of the catstomp file in the activemq distribution.

#!/usr/bin/env ruby
#
#   Copyright 2006 LogicBlaze Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#
begin; require 'rubygems'; rescue; end
require 'stomp'

begin

    @port = 61613
    @host = "localhost"
    @user = "system"
    @password = "manager"

    @destination = "/queue/hallo.world"

    $stderr.print "Connecting to stomp://#{@host}:#{@port} as #{@user}\n"
    @conn = Stomp::Connection.open(@user, @password, @host, @port, true)
    $stderr.print "Sending input to #{@destination}\n"

  reply_to = "/temp-queue/"

  @headers = {'persistent'=>'false', 'reply-to'=> reply_to}

  @conn.publish @destination, "hallo", @headers

  @conn.subscribe(reply_to, { :ack =>"client" })
  $stderr.print "waiting answer from #{reply_to}\n"
  @msg = @conn.receive
  $stdout.puts @msg.body
  $stdout.flush
  @conn.ack @msg.headers["message-id"]

rescue Exception => e
  puts "rescued #{e}"
end

All the magic is just send message to the /temp-queue/ destination. Activemq take care to create a temporary queue named /remote-temp-queue/ID:hostname-46977-1290515981859-6:31:1.

The answerer looks like (again thanks to the distribution stompcat):
#!/usr/bin/env ruby
#
#   Copyright 2006 LogicBlaze Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#
begin; require 'rubygems'; rescue; end
require 'stomp'

begin

    @port = 61613
    @host = "localhost"
    @user = "system"
    @password = "manager"


    @destination = "/queue/hallo.world"

    $stderr.print "Connecting to stomp://#{@host}:#{@port} as #{@user}\n"
    @conn = Stomp::Connection.open(@user, @password, @host, @port, true)
    $stderr.print "Getting output from #{@destination}\n"

    @conn.subscribe(@destination, { :ack =>"client" })
    while true
      @msg = @conn.receive
      reply_to = @msg.headers['reply-to']
      $stdout.print "reply_to: #{reply_to}\n"
      $stdout.print @msg.body
      $stdout.flush
      @conn.ack @msg.headers["message-id"]
      @conn.publish reply_to, "#{@msg.body} world"
    end

rescue Exception => e
  puts "rescued: #{e}"
end

To post the answer, it is enought to read the reply-to header from the request message and post there.

No comments: