java - Why is there a delay in Spring AMQP Message dispatching from a filled Queue? -


i using spring amqp in message driven application. noticed there constant delay of around 300ms between invocations of message listener, though sure queue filled messages. logfile below shows delay between blockingqueueconsumer.nextmessage , blockingqueueconsumer.handle call blockingqueueconsumer.handledelivery thread in between:

2015-05-12 12:46:18,655 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:18,655 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:18,967 debug [pool-1-thread-6 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:18,967 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@18dc305(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:16:06 cest 2015, messageid=143134227498011576, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=8, messagecount=0]) 2015-05-12 12:46:18,967 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:18,967 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 2015-05-12 12:46:18,967 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:18,967 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,280 debug [pool-1-thread-7 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,280 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@1aaa7d8(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:17:08 cest 2015, messageid=143134227498011584, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=9, messagecount=0]) 2015-05-12 12:46:19,280 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:19,280 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 2015-05-12 12:46:19,280 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:19,280 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,577 debug [pool-1-thread-3 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,577 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@1c893d2(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:18:07 cest 2015, messageid=143134227498011592, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=10, messagecount=0]) 2015-05-12 12:46:19,577 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:19,577 info  [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 

the logfile shows message processing when queue definately full of messages. relevant parts of spring configuration file looks this:

<rabbit:connection-factory id="amqpconnectionfactory" connection-factory="clientconnectionfactory"     host="${amqp.broker.ip}"     port="${amqp.broker.port}"     virtual-host="${amqp.broker.vhost}"     username="${amqp.user}"     password="${amqp.password}"/>  <bean id="clientconnectionfactory" class="org.springframework.amqp.rabbit.connection.rabbitconnectionfactorybean">     <property name="usessl" value="true" />     <property name="sslpropertieslocation" value="classpath:server.ini"/> </bean>  <bean id="amqptemplate" class="org.springframework.amqp.rabbit.core.rabbittemplate">     <property name="connectionfactory" ref="amqpconnectionfactory" />     <property name="messageconverter" ref="marshallingmessageconverter"/> </bean>  <bean id="marshallingmessageconverter" class="org.springframework.amqp.support.converter.marshallingmessageconverter">     <constructor-arg ref="jaxbmarshaller" /> </bean>  <oxm:jaxb2-marshaller id="jaxbmarshaller" context-path="com.my.package"/>  <rabbit:listener-container id="heartbeatlistenercontainer" connection-factory="amqpconnectionfactory" auto-startup="false">     <rabbit:listener ref="queuemessagehandler" queue-names="heartbeat-bdwh" /> </rabbit:listener-container>  <bean id="queuemessagehandler" class="com.my.package.queuemessagehandler"/> 

i struggling find reason delay. far understand originates spring blockingqueueconsumer. not sure happening , why there call thread blockingqueueconsumer.handledelivery method.

any apprectiated!

maybe network issue?

the default configuration handles 1 message @ time , next not sent broker until ack sent.

try increasing prefetch on listener container container has message available when consumer thread ready.

take @ network trace (wireshark or similar).

edit:

if have poor network, , can live increased possibility of duplicate deliveries, can consider increasing txsize acks not sent every message. sure set less prefetch, though.


Comments

Popular posts from this blog

IF statement in MySQL trigger -

c++ - What does MSC in "// appease MSC" comments mean? -

javascript - Add class to another page attribute using URL id - Jquery -