@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${-servers:ip1:9092,ip2:9092,ip3:9092}")
private String connectionPath;
@Value("${-id:consumer-group-1}")
private String groupId;
@Value("${:30000}")
private String zookeeperSessionTimeOut;
@Value("${:200}")
private String zookeeperSyncTime;
@Value("${-commit-interval:5000}")
private String autoCommitIntervalTime;
@Value("${:10}")
private int timeSection;
// @Value("${-offset-reset:latest}")
@Value("${-offset-reset:earliest}")
private String autoOffSetCommit;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
(consumerFactory());
(3);
().setPollTimeout(3000);
(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<String, Object>();// Configure the zookeeper address and port to connect to("", connectionPath);// Configure the group id of zookeeper("", groupId);// zookeeper session expiration time("", zookeeperSessionTimeOut);//
// Configure the timeout interval of zookeeper connection("", zookeeperSyncTime);// Automatically submit displacement variable time interval("",autoCommitIntervalTime);// Set the pull time interval("",300000);
("", timeSection);// Quantity per batch("", autoOffSetCommit);
("", true);
("", "");
("", "");
return propsMap;
}
}