spring+kafka 注解和xml配置消费者的方式

kafka | 2019-10-08 17:25:11

1、通过XML配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                  http://www.springframework.org/schema/context
                  http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 1.定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                <entry key="group.id" value="${group.id}" />
                <entry key="enable.auto.commit" value="${enable.auto.commit}" />
                <entry key="session.timeout.ms" value="${session.timeout.ms}" />
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 2.创建consumerFactory bean -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <!--&lt;!&ndash; 3.定义消费实现类 &ndash;&gt;-->
    <bean id="kafkaConsumerService" class="com.demo.bd.tasks.kafka.KafkaConsumerSerivceImpl" />

    <!-- 4.消费者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg name="topics">
            <list>
                <value>${kafka.consumer.topic}</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerService" />
    </bean>
    <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
    <bean id="messageListenerContainer"
          class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
          init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        <property name="concurrency" value="${concurrency}" />
    </bean>
</beans>

定义消费类:

public class KafkaConsumerSerivceImpl implements MessageListener<String, String> {
    private Logger log = LoggerFactory.getLogger(this.getClass());
    public void onMessage(ConsumerRecord<String, String> data) {
        String topic = data.topic();
        String value = data.value();
        log.info("----<<<<<<topic={},value={}",topic,value);
    }
}

 

2、通过注解配置

需要配置启用kafka注解

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    final static String list ="10.28.18.103:6667";

    /**
     * Description:获取配置
     * Date:        2017年7月11日
     * @author      shaqf
     */
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
        return props;
    }
    /** 获取工厂 */
    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }
    /** 获取实例 */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
        factory1.setConsumerFactory(consumerFactory());
        factory1.setConcurrency(2);
        factory1.getContainerProperties().setPollTimeout(3000);
        System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
        return factory1;
    }

    /**
     * topic的消费者组1监听
     * @return
     */
    @Bean
    public KafkaConsumerSerivce2Impl listener1() {
        return new KafkaConsumerSerivce2Impl();
    }
}

public class KafkaConsumerSerivce2Impl {

     private Logger log = LoggerFactory.getLogger(this.getClass());
    
     //支持EL表达式

     @KafkaListener(topics = {"${kafka.consumer.topic}"},group = "2")
     public void templarAgreementNoticewithhold(ConsumerRecord<String, String> data){
                //消费业务逻辑
          System.out.println("++++++++++++++++++++++++++++");
          String topic = data.topic();
          String value = data.value();
          log.info("---****->topic={},value={}",topic,value);
     }
}

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号