Proxy Service to Publish message to a RabbitMQ message Queue
I'm just creating a http proxy service which can be invoked using SOAP messages, So Service will enqueue the message to the message queue.
Following is the sample Synapse code for the proxy configuration.
Following is the sample Synapse code for the proxy configuration.
<? xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="rabitMQTestProxy"
transports="https,http"
statistics="disable"
trace="disable"
startOnLoad="true">
<target>
<inSequence >
<property name="FORCE_SC_ACCEPTED"
value="true"
scope="axis2"
type="STRING"/>
<property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
<property name="ContentType" value="text/xml" scope="axis2"/>
<send description="send message to AMQP queue">
<endpoint name="RABBIT">
<address uri="rabbitmq:/PublishRabbitMQ?rabbitmq.queue.exclusive=false&rabbitmq.queue.auto.delete=false&rabbitmq.queue.routing.key=destination&rabbitmq.server.host.name=172.22.217.31&rabbitmq.server.port=5672&rabbitmq.server.user.name=krishan&rabbitmq.server.password=k123&rabbitmq.queue.name=destination&rabbitmq.exchange.name=amq.direct"/>
</endpoint>
</send>
</inSequence>
<outSequence/ >
</target>
<description/>
</proxy>
Lets discuss about the proxy service
1) <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" type="STRING"/>
This property used to force the service to send 202 Accepted status code to the client2) <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
This property is to say that the proxy service has only one way transmission. So message only enqueued and not wait for the response.3) <property name="ContentType" value="text/xml" scope="axis2"/>
This property used to activate correct message builder. We are sending xml message to the (soap envelope) service.Lets see the address URL (RabbitMQ Queue)
<address uri="rabbitmq:/PublishRabbitMQ?rabbitmq.queue.exclusive=false&rabbitmq.queue.auto.delete=false&rabbitmq.queue.routing.key=destination&rabbitmq.server.host.name=172.22.217.31&rabbitmq.server.port=5672&rabbitmq.server.user.name=krishan&rabbitmq.server.password=k123&rabbitmq.queue.name=destination&rabbitmq.exchange.name=amq.direct"/>
This is the address endpoint URL and we use
Proxy Service To consume messages from the queue
<? xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="rabitMQconsumer"
transports="rabbitmq"
statistics="disable"
trace="disable"
startOnLoad="true">
<target>
<inSequence>
<log level="full"/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
<parameter name="rabbitmq.queue.name">destination</parameter>
<parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
<parameter name="rabbitmq.exchange.name">amq.direct</parameter>
<parameter name="rabbitmq.queue.routing.key">destination</parameter>
<description/>
</proxy>
The Common exception occurs when we are going to consume messages from RabbitMQ queue using ESB 4.8.1
[2015-10-09 14:45:52,695] ERROR - ServiceTaskManager Error while receiving message from queue
java.lang.NullPointerException
at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.getConsumerDelivery(ServiceTaskManager.java:360)
at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:182)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
[2015-10-09 14:45:52,704] ERROR - NativeWorkerPool Uncaught exception
org.apache.axis2.transport.rabbitmq.AxisRabbitMQException: Error while receiving message from queue
at org.apache.axis2.transport.rabbitmq.ServiceTaskManager.handleException(ServiceTaskManager.java:441)
at org.apache.axis2.transport.rabbitmq.ServiceTaskManager.access$300(ServiceTaskManager.java:45)
at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:214)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.NullPointerException
at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.getConsumerDelivery(ServiceTaskManager.java:360)
at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:182)
... 4 more
When we are going to consume message we will get an exception telling content type is null. This is a bug in RabbitMQ transport and this has been resolved in ESB 4.9.0 release. Also for the ESB 4.8.1 users can ask for official patch (WSO2-CARBON-PATCH-4.2.0-1245) from the WSO2.
After adding the patch you can add the following parameters to the consumer proxy which will resolve above error and also give additional features.
<parameter name="rabbitmq . message. content. type">text/xml</parameter>
You can set the content type accordingly to the message content type. ex - application/xml <parameter name="rabbitmq . connection. retry. interval" locked="false">10000</parameter>
<parameter name="rabbitmq . connection. retry. count" locked="false">5</parameter>
Also, you can add above parameters to recover connection if it failed.So these features coming with WSO2 ESB 4.8.1 and there are more functions available in WSO2 ESB 4.9.0. Specially SSL communication.
Don't worry, I will write a blog post regarding SSL communication with RabbitMQ. Please check my latest posts. Cheers!
No comments :
Post a Comment