java - Why is there a delay in Spring AMQP Message dispatching from a filled Queue? -
i using spring amqp in message driven application. noticed there constant delay of around 300ms between invocations of message listener, though sure queue filled messages. logfile below shows delay between blockingqueueconsumer.nextmessage
, blockingqueueconsumer.handle
call blockingqueueconsumer.handledelivery
thread in between:
2015-05-12 12:46:18,655 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:18,655 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:18,967 debug [pool-1-thread-6 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:18,967 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@18dc305(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:16:06 cest 2015, messageid=143134227498011576, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=8, messagecount=0]) 2015-05-12 12:46:18,967 info [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:18,967 info [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 2015-05-12 12:46:18,967 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:18,967 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,280 debug [pool-1-thread-7 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,280 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@1aaa7d8(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:17:08 cest 2015, messageid=143134227498011584, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=9, messagecount=0]) 2015-05-12 12:46:19,280 info [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:19,280 info [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done 2015-05-12 12:46:19,280 trace [simpleasynctaskexecutor-1] simplemessagelistenercontainer.doreceiveandexecute waiting message consumer. 2015-05-12 12:46:19,280 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.nextmessage retrieving delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,577 debug [pool-1-thread-3 ] blockingqueueconsumer.handledelivery storing delivery consumer: tags=[{amq.ctag-wwui6qjs1fanfpm7j6givw=my-queue}], channel=cached rabbit channel: amqchannel(mybrokerip,1), acknowledgemode=auto local queue size=0 2015-05-12 12:46:19,577 debug [simpleasynctaskexecutor-1] blockingqueueconsumer.handle received message: (body:'[b@1c893d2(byte[186])'messageproperties [headers={..headers..}, timestamp=tue may 12 01:18:07 cest 2015, messageid=143134227498011592, userid=null, appid=spt-t-2, clusterid=null, type=hbt, correlationid=null, replyto=null, contenttype=text, contentencoding=utf-8, contentlength=0, deliverymode=persistent, expiration=null, priority=null, redelivered=false, receivedexchange=incoming, receivedroutingkey=my-queue, deliverytag=10, messagecount=0]) 2015-05-12 12:46:19,577 info [simpleasynctaskexecutor-1] queuemessagehandler.onmessage incoming 2015-05-12 12:46:19,577 info [simpleasynctaskexecutor-1] queuemessagehandler.onmessage done
the logfile shows message processing when queue definately full of messages. relevant parts of spring configuration file looks this:
<rabbit:connection-factory id="amqpconnectionfactory" connection-factory="clientconnectionfactory" host="${amqp.broker.ip}" port="${amqp.broker.port}" virtual-host="${amqp.broker.vhost}" username="${amqp.user}" password="${amqp.password}"/> <bean id="clientconnectionfactory" class="org.springframework.amqp.rabbit.connection.rabbitconnectionfactorybean"> <property name="usessl" value="true" /> <property name="sslpropertieslocation" value="classpath:server.ini"/> </bean> <bean id="amqptemplate" class="org.springframework.amqp.rabbit.core.rabbittemplate"> <property name="connectionfactory" ref="amqpconnectionfactory" /> <property name="messageconverter" ref="marshallingmessageconverter"/> </bean> <bean id="marshallingmessageconverter" class="org.springframework.amqp.support.converter.marshallingmessageconverter"> <constructor-arg ref="jaxbmarshaller" /> </bean> <oxm:jaxb2-marshaller id="jaxbmarshaller" context-path="com.my.package"/> <rabbit:listener-container id="heartbeatlistenercontainer" connection-factory="amqpconnectionfactory" auto-startup="false"> <rabbit:listener ref="queuemessagehandler" queue-names="heartbeat-bdwh" /> </rabbit:listener-container> <bean id="queuemessagehandler" class="com.my.package.queuemessagehandler"/>
i struggling find reason delay. far understand originates spring blockingqueueconsumer. not sure happening , why there call thread blockingqueueconsumer.handledelivery method.
any apprectiated!
maybe network issue?
the default configuration handles 1 message @ time , next not sent broker until ack sent.
try increasing prefetch
on listener container container has message available when consumer thread ready.
take @ network trace (wireshark or similar).
edit:
if have poor network, , can live increased possibility of duplicate deliveries, can consider increasing txsize
acks not sent every message. sure set less prefetch
, though.
Comments
Post a Comment