现在java项目全部都会用sparing,所以我也用spring 的KafkaTemplate来 实现 kafka 消息发送案例之前,需要搭建kafka集群(单纯测试就可以用单例模式) ,请参考:
,1.maven添加kafka配置
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.9.RELEASE</version> </dependency>
2.resources目录下添加spring-kafka.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:util="http://www.springframework.org/schema/util" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <!-- 生产者配置 --> <bean id="kafkaProducerProperites" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <bean id="kafkaProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg ref="kafkaProducerProperites"/> </bean> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="kafkaProducerFactory" /> <constructor-arg name="autoFlush" value="true"/> <property name="defaultTopic" value="javaTopic"/> </bean> <!-- 消费者配置 --> <bean id="kafkaConsumerProperites" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="192.168.0.107:9092" /> <entry key="group.id" value="0"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <bean id="kafkaConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg ref="kafkaConsumerProperites"/> </bean> <bean id="consumerContainerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="topicOne"/> <property name="messageListener" ref="kafkaConsumerListener" /> </bean> <bean id="conusmerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> <constructor-arg ref="kafkaConsumerFactory"/> <constructor-arg ref="consumerContainerProperties"/> </bean> </beans>
3.在spring xml引入kafka xml
<import resource="spring-kafka.xml"/>
4.java 生产发送消息代码
发送消息并监听发送是否成功回执
@Autowired private KafkaTemplate kafkaTemplate;
kafkaTemplate.sendDefault("hello IT学问网").addCallback(new ListenableFutureCallback<SendResult<String,String>>(){ @Override public void onSuccess(SendResult<String, String> result) { //处理发送成功代码; } @Override public void onFailure(Throwable ex) { //处理发送失败代码; } });
5.java 消费接受消息代码
添加kafka消费监听器
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener; import org.springframework.stereotype.Component; @Component("kafkaConsumerListener") public class KafkaConsumerListener implements MessageListener<String, String> { @Override public void onMessage(ConsumerRecord<String, String> record) { String topic = record.topic(); String key = record.key(); String val = record.value(); long offset = record.offset(); int partition = record.partition(); System.out.printf("receive msg -- topic:%s key:%s val:%s offset:%s partition:%s \r\n",topic,key,val,offset,partition); } }