Friday 10 December 2010

Activemq stomp temporary queue

Temporary queues are used to the message-reply pattern. Should a service require a response from another service, the caller should create a temporary queue, and set the JMS header JMSReplyTo to the name of the temporary queue.

The answerer sends back response to the address specified in JMSReplyTo which is a temporary queue which can be read only the process who created it and that will be destroyed as soon as it has been used.

There is everywhere instructions how to implement the request-response pattern with jms, here how to do it with stomp with apache activemq.

This is the ask.rb file: it was just a slight modified version of the catstomp file in the activemq distribution.

#!/usr/bin/env ruby
#
#   Copyright 2006 LogicBlaze Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#
begin; require 'rubygems'; rescue; end
require 'stomp'

begin

    @port = 61613
    @host = "localhost"
    @user = "system"
    @password = "manager"

    @destination = "/queue/hallo.world"

    $stderr.print "Connecting to stomp://#{@host}:#{@port} as #{@user}\n"
    @conn = Stomp::Connection.open(@user, @password, @host, @port, true)
    $stderr.print "Sending input to #{@destination}\n"

  reply_to = "/temp-queue/"

  @headers = {'persistent'=>'false', 'reply-to'=> reply_to}

  @conn.publish @destination, "hallo", @headers

  @conn.subscribe(reply_to, { :ack =>"client" })
  $stderr.print "waiting answer from #{reply_to}\n"
  @msg = @conn.receive
  $stdout.puts @msg.body
  $stdout.flush
  @conn.ack @msg.headers["message-id"]

rescue Exception => e
  puts "rescued #{e}"
end

All the magic is just send message to the /temp-queue/ destination. Activemq take care to create a temporary queue named /remote-temp-queue/ID:hostname-46977-1290515981859-6:31:1.

The answerer looks like (again thanks to the distribution stompcat):
#!/usr/bin/env ruby
#
#   Copyright 2006 LogicBlaze Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#
begin; require 'rubygems'; rescue; end
require 'stomp'

begin

    @port = 61613
    @host = "localhost"
    @user = "system"
    @password = "manager"


    @destination = "/queue/hallo.world"

    $stderr.print "Connecting to stomp://#{@host}:#{@port} as #{@user}\n"
    @conn = Stomp::Connection.open(@user, @password, @host, @port, true)
    $stderr.print "Getting output from #{@destination}\n"

    @conn.subscribe(@destination, { :ack =>"client" })
    while true
      @msg = @conn.receive
      reply_to = @msg.headers['reply-to']
      $stdout.print "reply_to: #{reply_to}\n"
      $stdout.print @msg.body
      $stdout.flush
      @conn.ack @msg.headers["message-id"]
      @conn.publish reply_to, "#{@msg.body} world"
    end

rescue Exception => e
  puts "rescued: #{e}"
end

To post the answer, it is enought to read the reply-to header from the request message and post there.

Wednesday 20 October 2010

My first jruby javabean

I was going to write a javabean to be used inside camel in a java route like this:

from("file:src/data?noop=true").
     beanRef("addCorrelationId").
     log("log:endRoute");

in the MyRouteBuilder.java file of a standard mvn generated camel project.

Of course the bean has to be recalled in the came-context.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <bean id="addCorrelationId" class="com.test.AddCorrelationId"/>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <package>com.test</package>
  </camelContext>

</beans>

So I created inside src/main/ruby/com/test the following file (add_correlation_id.rb):
require 'java'

java_import "org.apache.camel.Exchange"
java_import "org.apache.camel.Processor"
java_import "org.slf4j.Logger"
java_import "org.slf4j.LoggerFactory"
java_import "ch.qos.logback.classic.LoggerContext"

java_package "com.test"

class AddCorrelationId
  def initialize
    @logger = LoggerFactory.getLogger("com.test.#{__FILE__}");
    @logger.debug("Hello from #{__FILE__}.");    
  end  

#java_annotation 'Exchange'
java_signature 'void process(Exchange exchange)'
  def process(exchange)
    @logger.debug(File.open(exchange.in.body.getBody.toString).read);    
    exchange.out.set_header("JMSCorrelationId", "mytestcorrelationId")
  end

end

Please note the java_signature statement to instruct camel to bind to the process function.

According to CamelInAction it should be possible to enforce the binding of certain parameter in the process function using annotation. Unfortunately I wasn't able to.

Next one need to compile jruby to a java source class:
jruby -S jrubyc --java -t src/main/java src/main/ruby/con/test
And run the maven camel task:
mvn camel:run
You should see printed the body of the files, both on console and to the target of logback (defined in src/main/resources/logback.xml). The header adding task has no effect on a file.

In order to use the compelling logback library, I had to add to the pom.xml:
 <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>${logback-version}</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>${logback-version}</version>
    </dependency>
    <dependency>
      <groupId>org.jruby</groupId>
      <artifactId>jruby-complete</artifactId>
      <version>1.5.3</version>
    </dependency>

Wednesday 6 October 2010

jruby processor for camel with jruby1.5

Here how to wrote a camel processor in jruby. A processor is a class able to modify message exchange like adding a header, changing message or just doing log.

First of all a jruby1.5 is required.

To start a camel project launch the maven task:
mvn archetype:create -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-java -DarchetypeVersion=2.4.0 -DgroupId=com.test -DartifactId=rubyProcessor

Move to the created rubyProcessor directory and create a custom src/main/ruby/com/test directory and place there the following:

require 'java'
java_import "org.apache.camel.Exchange"
java_import "org.apache.camel.Processor"

java_package "com.test"

class DownloadLogger
  java_signature 'DownloadLogger()'
  java_implements 'Processor'

  java_signature "void process(Exchange exchange)"
  def process(exchange)
    puts "Ruby downloaded: #{exchange.in.get_header("CamelFileName")}"
  end
end

Modify java source file at src/main/java/com/test/MyRouteBuilder.java to:
 from("file:src/data?noop=true").
     process(new it.unimore.cesia.DownloadLogger()).
            choice().
                when(xpath("/person/city = 'London'")).to("file:target/messages/uk").
                otherwise().to("file:target/messages/others");

Add to the pom.xml file the jruby dependency:
 
      org.jruby
      jruby-complete
      1.5.3
    
located with sonatype nexus service.

Now compile with jrubyc the ruby source class with:
/opt/jruby-1.5.1/bin/jrubyc --java src/main/ruby/com/test/download_logger.rb
cp -v  com/test/DownloadLogger.java src/main/java/it/com/test
The java source class is created in the com/test directory of the presente working directory: it might be wise to move to src/main/java, maybe, so the cp can be saved.

Then execute camel application with mvn camel:run.

After a while you should be seeing:
Ruby downloaded: message2.xml
Ruby downloaded: message1.xml

Thursday 27 May 2010

Shibboleth IdP cluster with Terracotta

Even if strongly discouraged, I gave a try to Shibboleth clustering with Terracotta. Definitive instruction at: https://spaces.internet2.edu/display/SHIB2/IdPCluster.

I managed to have cluster working only with terracotta-3.1.1, with is a bit old, as newer version is 3.2.1. To download terracotta-3.1.1 I had to craft by hand the download url, after the require registration.

Instruction are so clear that I'm not going to add anything. I recommend to create beforhand the directories:
$IDP_HOME/cluster
$TC_HOME/logs

Here is my startup script for debian:
#!/bin/sh

JAVA_HOME=/usr/lib/jvm/java-6-sun
TC_HOME=/opt/terracotta
TC_CONFIG=/opt/shibboleth-idp/conf/tc-config.xml


# where to put stdout/stderr logs
TC_LOGS=$TC_HOME/logs

# user to run tc as
TC_USER=root
# the identity of this node
TC_SERVER=node1

JAVA_OPTS="\
-verbose:gc \
-XX:+PrintGCDetails \
-XX:-TraceClassUnloading \
-Xmx512M \
-Xms512M \
-XX:MaxGCPauseMillis=5000 \
-XX:+UseParallelOldGC \
-XX:+PrintCommandLineFlags \
"


export JAVA_HOME
export JAVA_OPTS
 
start () {
 echo "Starting Terracotta Server as " ${TC_SERVER}
  ${TC_HOME}/bin/start-tc-server.sh -n ${TC_SERVER} -f ${TC_CONFIG}  > ${TC_LOGS}/terracotta.log 2>&1 &
#        $TC_INSTALL_DIR/bin/start-tc-server.sh -f $TC_CONFIG_PATH &
}
stop () {
 echo "Stopping Terracotta Server ..."
 ${TC_HOME}/bin/stop-tc-server.sh 2>&1 > ${TC_LOGS}/terracotta.log &

#        $TC_INSTALL_DIR/bin/stop-tc-server.sh -f $TC_CONFIG_PATH  -n `hostname -s` &
}
 
case "$1" in
  start)
      start
      ;;
  stop)
      stop
      ;;
  restart)
      stop
      sleep 10
      start
      ;;
  *)
    echo "Usage terracotta start|stop|restart"
    exit 1;;
esac
 
exit $?

Of course TC_SERVER=node1 should reflect your configuration.

The flags to JAVA_OPTS were gathered in the mailing list.

Next it is necessary to modify tomcat6 to join terracotta. I didn't find a way to put configuration in /etc/default/tomcat, so I modified /etc/init.d/tomcat6 in CATALINA_OPTS which now reads:
CATALINA_OPTS="-Xms256M -Xmx512M -XX:MaxPermSize=192M -server -Dtc.install-root=/opt/terracotta -Dtc.config=/opt/shibboleth-idp/conf/tc-config.xml -Xbootclasspath/p:/opt/terracotta/lib/dso-
boot/dso-boot-hotspot_linux_160_12.jar"

Monday 19 April 2010

Durable Topic Subscribers from activemq with ruby stomp

Durable topic subscribers receive messages arrived in the topic while subscriber is disconnected.

Topic subcsribers (non-durable) lose messages posted while subscriber is off-line.

According to documentation, a ruby stomp durable subscriber for apache activemq-5.3.0 has to set client-id header at connection and subscriber name at subscription.

Former is achieved with:
@conn = Stomp::Connection.open(@user, @password, @host, @port, true, 5, {"client-id" => "stomp", 'client-id' => 'stomp', 'clientID' => "stomp1", 'client_id' => "stomp2"} )

latter with:
@conn.subscribe(@destination, {"activemq.subscriptionName" => "stomp", :ack =>"client" })

Thanks to Calliope Sounds' Working Ruby + Stomp example blog entry, I finally managed to locate the last missing piece: the "client.join" which is necessary.

Just as a reference, here the stompcat utility as Durable topic subscriber:
#!/usr/bin/env ruby
#
#   Copyright 2006 LogicBlaze Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#
begin; require 'rubygems'; rescue; end
require 'stomp'

#
# This simple script is inspired by the netcat utility.  It allows you to receive
# data from a stomp destination and output it.
#
# Usage: stompcat (destination-name)
#
# Example: stompcat /topic/foo
#   Would display output that arrives at the /topic/foo stomp destination
#
begin

    @port = 61613
    @host = "localhost"
    @user = ENV["STOMP_USER"];
    @password = ENV["STOMP_PASSWORD"]

    @host = ENV["STOMP_HOST"] if ENV["STOMP_HOST"] != nil
    @port = ENV["STOMP_PORT"] if ENV["STOMP_PORT"] != nil

    @destination = "/topic/default"
    @destination = $*[0] if $*[0] != nil

    $stderr.print "Connecting to stomp://#{@host}:#{@port} as #{@user}\n"
#    @conn = Stomp::Client.new(:login => @user, :passcode => @password, :host => @host, :port => @port,  :headers => {'client-id' => 'stompcat'} )
    @conn = Stomp::Connection.open(@user, @password, @host, @port, true, 5, {"client-id" => "stomp"} )
    $stderr.print "Getting output from #{@destination}\n"

    @conn.subscribe(@destination, {"activemq.subscriptionName" => "stomp", :ack =>"client" })
    while true
        @msg = @conn.receive
        $stdout.print @msg.body
        $stdout.print @msg.headers.inspect
        $stdout.print "\n" 
        $stdout.flush
        @conn.ack @msg.headers["message-id"]
    end
    @conn.join

rescue Exception => e
 p e
end