Spring-Integration XML to Java -
how can convert code java config?
<int-kafka:outbound-channel-adapter id="mainoutboundchanneladapter" kafka-producer-context-ref="kafkaproducercontext" channel="mainoutboundtopicchanel"> </int-kafka:outbound-channel-adapter>
yes, can. please, find latest spring integration java dsl:
your case may looks like:
@bean public integrationflow sendtokafkaflow(string serveraddress) { return f -> f.<string>split(p -> fastlist.newwithnvalues(100, () -> p), null) .handle(kafkamessagehandler(serveraddress)); } private kafkaproducermessagehandlerspec kafkamessagehandler(string serveraddress) { return kafka.outboundchanneladapter(props -> props.put("queue.buffering.max.ms", "15000")) .messagekey(m -> m.getheaders().get(integrationmessageheaderaccessor.sequence_number)) .addproducer(test_topic, serveraddress, this::producer); } private void producer(kafkaproducermessagehandlerspec.producermetadataspec metadata) { metadata.async(true) .batchnummessages(10) .valueclasstype(string.class) .<string>valueencoder(string::getbytes) .keyencoder(new intencoder(null)); }
update without lambdas, still spring integration:
@bean @serviceactivator(inputchannel = "mainoutboundtopicchanel") public messagehandler kafkaproducer() { return new kafkaproducermessagehandler<string, string>(kafkaproducercontext()); } @bean public kafkaproducercontext<string, string> kafkaproducercontext() { kafkaproducercontext<string, string> kafkaproducercontext = new kafkaproducercontext<string, string>(); producermetadata<string, string> producermetadata = new producermetadata<string, string>(topic); producermetadata.setvalueclasstype(string.class); producermetadata.setkeyclasstype(string.class); encoder<string> encoder = new stringencoder<string>(); producermetadata.setvalueencoder(encoder); producermetadata.setkeyencoder(encoder); producermetadata.setasync(true); properties props = new properties(); props.put("queue.buffering.max.ms", "15000"); producerfactorybean<string, string> producer = new producerfactorybean<string, string>(producermetadata, kafkarule.getbrokersasstring(), props); producerconfiguration<string, string> config = new producerconfiguration<string, string>(producermetadata, producer.getobject()); kafkaproducercontext.setproducerconfigurations(collections.singletonmap(topic, config)); return kafkaproducercontext; }
and don't forget add @enableintegration
alongside @configuration
.
for future: xml tag in spring parsed namespacehandler
, e.g. in case kafkanamespacehandler
. reading source code can find these lines:
registerbeandefinitionparser("outbound-channel-adapter", new kafkaoutboundchanneladapterparser()); registerbeandefinitionparser("producer-context", new kafkaproducercontextparser());
when go kafkaoutboundchanneladapterparser
, see populates beandefinition
:
final beandefinitionbuilder kafkaproducermessagehandlerbuilder = beandefinitionbuilder.genericbeandefinition(kafkaproducermessagehandler.class);
and on source code.
update 2
the consumer
part:
@bean @inboundchanneladapter(value = "fromkafkachannel", poller = @poller(fixedrate = "10", maxmessagesperpoll = "1")) public messagesource<map<string, map<integer, list<object>>>> kafkamessagesource() { return new kafkahighlevelconsumermessagesource<string, string>(); } @bean public kafkaconsumercontext<string, string> kafkaconsumercontext() { kafkaconsumercontext<string, string> kafkaconsumercontext = new kafkaconsumercontext<string, string>(); ..... kafkaconsumercontext.setconsumerconfigurations(map); return kafkaconsumercontext; }
Comments
Post a Comment