1、通过XML配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 1.定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}" />
<entry key="group.id" value="${group.id}" />
<entry key="enable.auto.commit" value="${enable.auto.commit}" />
<entry key="session.timeout.ms" value="${session.timeout.ms}" />
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 2.创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<!--<!– 3.定义消费实现类 –>-->
<bean id="kafkaConsumerService" class="com.demo.bd.tasks.kafka.KafkaConsumerSerivceImpl" />
<!-- 4.消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics">
<list>
<value>${kafka.consumer.topic}</value>
</list>
</constructor-arg>
<property name="messageListener" ref="kafkaConsumerService" />
</bean>
<!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainer"
class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart" >
<constructor-arg ref="consumerFactory" />
<constructor-arg ref="containerProperties" />
<property name="concurrency" value="${concurrency}" />
</bean>
</beans>
定义消费类:
public class KafkaConsumerSerivceImpl implements MessageListener<String, String> {
private Logger log = LoggerFactory.getLogger(this.getClass());
public void onMessage(ConsumerRecord<String, String> data) {
String topic = data.topic();
String value = data.value();
log.info("----<<<<<<topic={},value={}",topic,value);
}
}
2、通过注解配置
需要配置启用kafka注解
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
final static String list ="10.28.18.103:6667";
/**
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
return props;
}
/** 获取工厂 */
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
/** 获取实例 */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
factory1.setConsumerFactory(consumerFactory());
factory1.setConcurrency(2);
factory1.getContainerProperties().setPollTimeout(3000);
System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
return factory1;
}
/**
* topic的消费者组1监听
* @return
*/
@Bean
public KafkaConsumerSerivce2Impl listener1() {
return new KafkaConsumerSerivce2Impl();
}
}
public class KafkaConsumerSerivce2Impl {
private Logger log = LoggerFactory.getLogger(this.getClass());
//支持EL表达式
@KafkaListener(topics = {"${kafka.consumer.topic}"},group = "2")
public void templarAgreementNoticewithhold(ConsumerRecord<String, String> data){
//消费业务逻辑
System.out.println("++++++++++++++++++++++++++++");
String topic = data.topic();
String value = data.value();
log.info("---****->topic={},value={}",topic,value);
}
}