Sunday, September 13, 2015

Configuring RabbitMQ with WSO2 ESB 4.8.1 PART 3

In this post I'm going to create two services that one is publishing  messages to the RabbitMQ Message Que and other service which consume the messages from the queue.

Proxy Service to Publish message to a RabbitMQ message Queue

I'm just creating a http proxy service which can be invoked using SOAP messages, So Service will enqueue the message to the message queue.

Following is the sample Synapse code for the proxy configuration.

 <?xml version="1.0" encoding="UTF-8"?>  
 <proxy xmlns="http://ws.apache.org/ns/synapse"  
     name="rabitMQTestProxy"  
     transports="https,http"  
     statistics="disable"  
     trace="disable"  
     startOnLoad="true">  
   <target>  
    <inSequence>  
      <property name="FORCE_SC_ACCEPTED"  
           value="true"  
           scope="axis2"  
           type="STRING"/>  
      <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>  
      <property name="ContentType" value="text/xml" scope="axis2"/>  
      <send description="send message to AMQP queue">  
       <endpoint name="RABBIT">  
         <address uri="rabbitmq:/PublishRabbitMQ?rabbitmq.queue.exclusive=false&amp;rabbitmq.queue.auto.delete=false&amp;rabbitmq.queue.routing.key=destination&amp;rabbitmq.server.host.name=172.22.217.31&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=krishan&amp;rabbitmq.server.password=k123&amp;rabbitmq.queue.name=destination&amp;rabbitmq.exchange.name=amq.direct"/>  
       </endpoint>  
      </send>  
    </inSequence>  
    <outSequence/>  
   </target>  
   <description/>  
 </proxy>  

Lets discuss about the proxy service

1) <property name="FORCE_SC_ACCEPTED" value="true"  scope="axis2"  type="STRING"/>   
This property used to force the service to send 202 Accepted status code to the client


2)  <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>   
This property is to say that the proxy service has only one way transmission. So message only enqueued and not wait for the response.

3) <property name="ContentType" value="text/xml" scope="axis2"/>   
This property used to activate correct message builder. We are sending xml message to the (soap envelope) service.


Lets see the address URL (RabbitMQ Queue)
 <address uri="rabbitmq:/PublishRabbitMQ?rabbitmq.queue.exclusive=false&amp;rabbitmq.queue.auto.delete=false&amp;rabbitmq.queue.routing.key=destination&amp;rabbitmq.server.host.name=172.22.217.31&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=krishan&amp;rabbitmq.server.password=k123&amp;rabbitmq.queue.name=destination&amp;rabbitmq.exchange.name=amq.direct"/>   

This is the address endpoint URL and we use rabbitmq:/ as the AMQP protocol. Also we are sending some values as query strings to the queue. The values are used by the backend as properties.

Proxy Service To consume messages from the queue 

 <?xml version="1.0" encoding="UTF-8"?>  
 <proxy xmlns="http://ws.apache.org/ns/synapse"  
     name="rabitMQconsumer"  
     transports="rabbitmq"  
     statistics="disable"  
     trace="disable"  
     startOnLoad="true">  
   <target>  
    <inSequence>  
      <log level="full"/>  
    </inSequence>  
    <outSequence/>  
    <faultSequence/>  
   </target>  
   <parameter name="rabbitmq.queue.name">destination</parameter>  
   <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>  
   <parameter name="rabbitmq.exchange.name">amq.direct</parameter>  
   <parameter name="rabbitmq.queue.routing.key">destination</parameter>    
   <description/>  
 </proxy>  


rabbitmq.queue.name - Name of the queue which you want to consume.
rabbitmq.connection.factory - RabbitMQ factory name you created in axis2.xml.
rabbitmq.exchange.name - RabbitMQ exchange name (If you haven’t created exchange ‘use amq.direct’).
rabbitmq.queue.routing.key - this is optional , you can give queue name.

The Common exception occurs when we are going  to consume messages from RabbitMQ queue using ESB 4.8.1

 [2015-10-09 14:45:52,695] ERROR - ServiceTaskManager Error while receiving message from queue  
 java.lang.NullPointerException  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.getConsumerDelivery(ServiceTaskManager.java:360)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:182)  
     at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)  
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
     at java.lang.Thread.run(Thread.java:744)  
 [2015-10-09 14:45:52,704] ERROR - NativeWorkerPool Uncaught exception  
 org.apache.axis2.transport.rabbitmq.AxisRabbitMQException: Error while receiving message from queue  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager.handleException(ServiceTaskManager.java:441)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager.access$300(ServiceTaskManager.java:45)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:214)  
     at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)  
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
     at java.lang.Thread.run(Thread.java:744)  
 Caused by: java.lang.NullPointerException  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.getConsumerDelivery(ServiceTaskManager.java:360)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:182)  
     ... 4 more  

When we are going to consume message we will get an exception telling content type is null. This is a bug in RabbitMQ transport and this has been resolved in ESB 4.9.0 release. Also for the ESB 4.8.1 users can ask for official patch (WSO2-CARBON-PATCH-4.2.0-1245) from the WSO2.

After adding the patch you can add the following parameters to the consumer proxy which will resolve above error and also give additional features.

 <parameter name="rabbitmq.message.content.type">text/xml</parameter>  
You can set the content type accordingly to the message content type.  ex - application/xml, application/json etc.

 <parameter name="rabbitmq.connection.retry.interval" locked="false">10000</parameter>  
 <parameter name="rabbitmq.connection.retry.count" locked="false">5</parameter>  
Also, you can add above parameters to recover connection if it failed.

So these features coming with WSO2 ESB 4.8.1 and there are more functions available in WSO2 ESB 4.9.0. Specially SSL communication.

Don't worry, I will write a blog post regarding SSL communication with RabbitMQ. Please check my latest posts. Cheers!

Saturday, September 12, 2015

Configuring RabbitMQ With WSO2 ESB 4.8.1 PART 2

Installing RabbitMQ AMQP Transport to WSO2 ESB 4.8.1


The RabbitMQ AMQP Transport is developed as a separate module from axis2 default transports. So we have to install this transport as a feature to the ESB in order to deal with RabbitMQ.

To install RabbitMQ Transport as a feature follow the steps below.

  1. In Management Console click Configure ->Features ->add repository
  2. Provide any name for repository name and give following URL to Location URL  http://product-dist.wso2.com/p2/extras/releases/4.2.0/rabbitmq-axis2-transport/
  3. Click add button to add repository.
  4. After adding the repository tick on the show only the latest versions (untick Group features by category) and click on Find Features button
  5. You will see Axis2 Transport RabbitMQ AMQP listed under available features and add a tick to checkbox and install the feature
  6. Go through the installation wizard until getting the Installation complete message
  7. Restart the server to take effect
  8. Diagram 1 : add RabbitMQ repositor








Diagram 2: install Axis2 Transport RabbitMQ AMQP


Configuring RabbitMQ Transport 

Open axis2.xml (located in ESB_HOME/repository/conf/axis2/axis2.xml) and add the following RabbitMQ listener under the listener section.
 <transportReceiver name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQListener">  
   <parameter name="AMQPConnectionFactory" locked="false">  
    <parameter name="rabbitmq.server.host.name" locked="false">192.168.0.3</parameter>  
    <parameter name="rabbitmq.server.port" locked="false">5672</parameter>  
    <parameter name="rabbitmq.server.user.name" locked="false">user</parameter>  
    <parameter name="rabbitmq.server.password" locked="false">password</parameter>  
   </parameter>  
 </transportReceiver>  
In the Transport Sender section, add the following RabbitMQ sender
 <transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"/>  
Restart the Server to take effect above configurations. 
So we have finished the configuring ESB to communicate with RabbitMQ message broker, from next part we will see on configuring message provider proxy service and consumer proxy service.






Friday, September 4, 2015

Configuring RabbitMQ With WSO2 ESB 4.8.1 PART 1

Environment

  • WSO2 ESB 4.8.1
  • RabbitMQ 3.5.4
  • Erlang OTP 18.0
  • JDK 1.7

In this blog post I'm going to explain Integrating RabbitMQ with WSO2 ESB 4.8.1 for Message provider and consumer operations. 

ESB can act as message provider to the queue and consumer from the queue. Following diagram shows provider and consumer operations


Diagram 1: ESB Producer/Consumer Operations

Installing RabbitMQ 


RabbitMQ uses Erlang OTP so it is must  install Erlang before installing the RabbitMQ.

In Windows

  1. Download Erlang OTP  from here. Installing process is automated. Double click the .exe file and follow the default instructions.
  2. Download RabbitMQ installer from here and install it with default settings. After finishing the server will automatically up and run.

In Linux

  1. Install Erlang by using the following command. sudo apt-get install erlang erlang-doc
  2. Download RabbitMQ Generic Unix release (tar.gz, from rabbitmq.com)  from here

Enabling the RabbitMQ management plugin


By default management GUI is disable so we have to manually enable it. Go inside the sbin folder using terminal and run the command ./rabbitmq-plugins enable rabbitmq_management


You will see following plugins enable after you run the above command
  • mochiweb
  • webmachine
  • rabbitmq-web-dispatch
  • amqp_client
  • rabbitmq-management-agent
  • rabbitmq-management
So now we have successfully enable the management console plugin.

To start the server, please use the command. ./rabbitmq-server start within the sbin directory and now we can access the console using the URL http://localhost:15672/ (If you want to access the console remotely replace the localhost with the IP address)


Login to the management console using following user Name and Password.
User Name - guest
Password - guest

So you can see the RabbitMQ management console like following

Diagram 2: RabbitMQ Management Console


It's better to create new user account for future works. So go to the admin tab and create a new user with Admin tag. Click on the newly created user name and set admin permissions. So now the RabbitMQ Server is up and running. We will see how to configure the WSO2 ESB 4.8.1 to deal with the RabbitMQ. Please follow the second part of this post.



If your server doesn't start please make sure whether following ports are free and any firewall or security application not prevent the RabbitMQ from binding to a port.


  • 4369 (epmd), 25672 (Erlang distribution)
  • 5672, 5671 (AMQP 0-9-1 without and with TLS)
  • 15672 (if management plugin is enabled)
  • 61613, 61614 (if STOMP is enabled)
  • 1883, 8883 (if MQTT is enabled)