Sunday, September 13, 2015

Configuring RabbitMQ with WSO2 ESB 4.8.1 PART 3

In this post I'm going to create two services that one is publishing  messages to the RabbitMQ Message Que and other service which consume the messages from the queue.

Proxy Service to Publish message to a RabbitMQ message Queue

I'm just creating a http proxy service which can be invoked using SOAP messages, So Service will enqueue the message to the message queue.

Following is the sample Synapse code for the proxy configuration.

 <?xml version="1.0" encoding="UTF-8"?>  
 <proxy xmlns="http://ws.apache.org/ns/synapse"  
     name="rabitMQTestProxy"  
     transports="https,http"  
     statistics="disable"  
     trace="disable"  
     startOnLoad="true">  
   <target>  
    <inSequence>  
      <property name="FORCE_SC_ACCEPTED"  
           value="true"  
           scope="axis2"  
           type="STRING"/>  
      <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>  
      <property name="ContentType" value="text/xml" scope="axis2"/>  
      <send description="send message to AMQP queue">  
       <endpoint name="RABBIT">  
         <address uri="rabbitmq:/PublishRabbitMQ?rabbitmq.queue.exclusive=false&amp;rabbitmq.queue.auto.delete=false&amp;rabbitmq.queue.routing.key=destination&amp;rabbitmq.server.host.name=172.22.217.31&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=krishan&amp;rabbitmq.server.password=k123&amp;rabbitmq.queue.name=destination&amp;rabbitmq.exchange.name=amq.direct"/>  
       </endpoint>  
      </send>  
    </inSequence>  
    <outSequence/>  
   </target>  
   <description/>  
 </proxy>  

Lets discuss about the proxy service

1) <property name="FORCE_SC_ACCEPTED" value="true"  scope="axis2"  type="STRING"/>   
This property used to force the service to send 202 Accepted status code to the client


2)  <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>   
This property is to say that the proxy service has only one way transmission. So message only enqueued and not wait for the response.

3) <property name="ContentType" value="text/xml" scope="axis2"/>   
This property used to activate correct message builder. We are sending xml message to the (soap envelope) service.


Lets see the address URL (RabbitMQ Queue)
 <address uri="rabbitmq:/PublishRabbitMQ?rabbitmq.queue.exclusive=false&amp;rabbitmq.queue.auto.delete=false&amp;rabbitmq.queue.routing.key=destination&amp;rabbitmq.server.host.name=172.22.217.31&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=krishan&amp;rabbitmq.server.password=k123&amp;rabbitmq.queue.name=destination&amp;rabbitmq.exchange.name=amq.direct"/>   

This is the address endpoint URL and we use rabbitmq:/ as the AMQP protocol. Also we are sending some values as query strings to the queue. The values are used by the backend as properties.

Proxy Service To consume messages from the queue 

 <?xml version="1.0" encoding="UTF-8"?>  
 <proxy xmlns="http://ws.apache.org/ns/synapse"  
     name="rabitMQconsumer"  
     transports="rabbitmq"  
     statistics="disable"  
     trace="disable"  
     startOnLoad="true">  
   <target>  
    <inSequence>  
      <log level="full"/>  
    </inSequence>  
    <outSequence/>  
    <faultSequence/>  
   </target>  
   <parameter name="rabbitmq.queue.name">destination</parameter>  
   <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>  
   <parameter name="rabbitmq.exchange.name">amq.direct</parameter>  
   <parameter name="rabbitmq.queue.routing.key">destination</parameter>    
   <description/>  
 </proxy>  


rabbitmq.queue.name - Name of the queue which you want to consume.
rabbitmq.connection.factory - RabbitMQ factory name you created in axis2.xml.
rabbitmq.exchange.name - RabbitMQ exchange name (If you haven’t created exchange ‘use amq.direct’).
rabbitmq.queue.routing.key - this is optional , you can give queue name.

The Common exception occurs when we are going  to consume messages from RabbitMQ queue using ESB 4.8.1

 [2015-10-09 14:45:52,695] ERROR - ServiceTaskManager Error while receiving message from queue  
 java.lang.NullPointerException  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.getConsumerDelivery(ServiceTaskManager.java:360)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:182)  
     at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)  
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
     at java.lang.Thread.run(Thread.java:744)  
 [2015-10-09 14:45:52,704] ERROR - NativeWorkerPool Uncaught exception  
 org.apache.axis2.transport.rabbitmq.AxisRabbitMQException: Error while receiving message from queue  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager.handleException(ServiceTaskManager.java:441)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager.access$300(ServiceTaskManager.java:45)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:214)  
     at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)  
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
     at java.lang.Thread.run(Thread.java:744)  
 Caused by: java.lang.NullPointerException  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.getConsumerDelivery(ServiceTaskManager.java:360)  
     at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:182)  
     ... 4 more  

When we are going to consume message we will get an exception telling content type is null. This is a bug in RabbitMQ transport and this has been resolved in ESB 4.9.0 release. Also for the ESB 4.8.1 users can ask for official patch (WSO2-CARBON-PATCH-4.2.0-1245) from the WSO2.

After adding the patch you can add the following parameters to the consumer proxy which will resolve above error and also give additional features.

 <parameter name="rabbitmq.message.content.type">text/xml</parameter>  
You can set the content type accordingly to the message content type.  ex - application/xml, application/json etc.

 <parameter name="rabbitmq.connection.retry.interval" locked="false">10000</parameter>  
 <parameter name="rabbitmq.connection.retry.count" locked="false">5</parameter>  
Also, you can add above parameters to recover connection if it failed.

So these features coming with WSO2 ESB 4.8.1 and there are more functions available in WSO2 ESB 4.9.0. Specially SSL communication.

Don't worry, I will write a blog post regarding SSL communication with RabbitMQ. Please check my latest posts. Cheers!

No comments :

Post a Comment