web123456

Kafka Consumer Parameters Strikes

@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${-servers:ip1:9092,ip2:9092,ip3:9092}") private String connectionPath; @Value("${-id:consumer-group-1}") private String groupId; @Value("${:30000}") private String zookeeperSessionTimeOut; @Value("${:200}") private String zookeeperSyncTime; @Value("${-commit-interval:5000}") private String autoCommitIntervalTime; @Value("${:10}") private int timeSection; // @Value("${-offset-reset:latest}") @Value("${-offset-reset:earliest}") private String autoOffSetCommit; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); (consumerFactory()); (3); ().setPollTimeout(3000); (true); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<String, Object>();// Configure the zookeeper address and port to connect to("", connectionPath);// Configure the group id of zookeeper("", groupId);// zookeeper session expiration time("", zookeeperSessionTimeOut);// // Configure the timeout interval of zookeeper connection("", zookeeperSyncTime);// Automatically submit displacement variable time interval("",autoCommitIntervalTime);// Set the pull time interval("",300000); ("", timeSection);// Quantity per batch("", autoOffSetCommit); ("", true); ("", ""); ("", ""); return propsMap; } }