java - Why is there a delay in Spring AMQP Message dispatching from a filled Queue? -


i using spring amqp in message driven application. noticed there constant delay of around 300ms between invocations of message listener, though sure queue filled messages. logfile below shows delay between blockingqueueconsumer.nextmessage , blockingqueueconsumer.handle call blockingqueueconsumer.handledelivery thread in between:

2015-05-12 12:46:18,655 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:18,655 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:18,967 debug [pool-1-thread-6 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:18,967 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@18dc305(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:16:06 cest 2015, messageid=143134227498011576, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=8, messagecount=0]) 2015-05-12 12:46:18,967 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:18,967 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 2015-05-12 12:46:18,967 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:18,967 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,280 debug [pool-1-thread-7 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,280 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@1aaa7d8(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:17:08 cest 2015, messageid=143134227498011584, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=9, messagecount=0]) 2015-05-12 12:46:19,280 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:19,280 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 2015-05-12 12:46:19,280 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:19,280 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,577 debug [pool-1-thread-3 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,577 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@1c893d2(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:18:07 cest 2015, messageid=143134227498011592, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=10, messagecount=0]) 2015-05-12 12:46:19,577 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:19,577 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 

the logfile shows message processing when queue definately full of messages. relevant parts of spring configuration file looks this:

<rabbit:connection-factory id="amqpconnectionfactory" connection-factory="clientconnectionfactory"     host="${amqp.broker.ip}"     port="${amqp.broker.port}"     virtual-host="${amqp.broker.vhost}"     username="${amqp.user}"     password="${amqp.password}"/>  <bean id="clientconnectionfactory" class="org.springframework.amqp.rabbit.connection.rabbitconnectionfactorybean">     <property name="usessl" value="true" />     <property name="sslpropertieslocation" value="classpath:server.ini"/> </bean>  <bean id="amqptemplate" class="org.springframework.amqp.rabbit.core.rabbittemplate">     <property name="connectionfactory" ref="amqpconnectionfactory" />     <property name="messageconverter" ref="marshallingmessageconverter"/> </bean>  <bean id="marshallingmessageconverter" class="org.springframework.amqp.support.converter.marshallingmessageconverter">     <constructor-arg ref="jaxbmarshaller" /> </bean>  <oxm:jaxb2-marshaller id="jaxbmarshaller" context-path="com.my.package"/>  <rabbit:listener-container id="heartbeatlistenercontainer" connection-factory="amqpconnectionfactory" auto-startup="false">     <rabbit:listener ref="queuemessagehandler" queue-names="heartbeat-bdwh" /> </rabbit:listener-container>  <bean id="queuemessagehandler" class="com.my.package.queuemessagehandler"/> 

i struggling find reason delay. far understand originates spring blockingqueueconsumer. not sure happening , why there call thread blockingqueueconsumer.handledelivery method.

any apprectiated!

maybe network issue?

the default configuration handles 1 message @ time , next not sent broker until ack sent.

try increasing prefetch on listener container container has message available when consumer thread ready.

take @ network trace (wireshark or similar).

edit:

if have poor network, , can live increased possibility of duplicate deliveries, can consider increasing txsize acks not sent every message. sure set less prefetch, though.


Comments

Popular posts from this blog

android - MPAndroidChart - How to add Annotations or images to the chart -

javascript - Add class to another page attribute using URL id - Jquery -

firefox - Where is 'webgl.osmesalib' parameter? -