web123456

Redis message queue

  • package ;
  • import ;
  • import ;
  • import .slf4j.Slf4j;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import .*;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • /**
  • * Using redis native message queue implementation
  • * Copyright © 2021 meicet. All rights reserved.
  • * @author zyx
  • * @date 2021-06-21 09:49:11
  • */
  • @Slf4j
  • @Configuration
  • @Component
  • public class RedisConsumerConfig implements DisposableBean {
  • @Resource
  • private ApplicationContext context;
  • @Resource
  • private StringRedisTemplate stringRedisTemplate;
  • private Vector<StreamMessageListenerContainer<String, ObjectRecord<String, MeiceUser>>> containerList = new Vector<>();
  • @Resource
  • ForkJoinPool forkJoinPool;
  • @Resource
  • RedisConnectionFactory factory;
  • @Bean(name = "forkJoinPool")
  • public ExecutorService forkJoinPool() {
  • return new ForkJoinPool();
  • }
  • @PostConstruct
  • public void initRedisStream() throws Exception {
  • Map<String, Object> beansWithAnnotation = ();
  • if (() == 0) {
  • return;
  • }
  • for (Object item : ()) {
  • if (!(item instanceof StreamListener)) {
  • continue;
  • }
  • Method method = ().getDeclaredMethod("onMessage", );
  • MeiceRedisStreamListener annotation = ();
  • if (annotation == null) {
  • continue;
  • }
  • creasteSubscription(factory, (StreamListener) item, (), (), ());
  • }
  • }
  • private void creatGroup(String key, String group) {
  • StreamOperations<String, String, MeiceUser> streamOperations = this.();
  • String groupName = (key, group);
  • ("creatGroup:{}", groupName);
  • }
  • private Subscription creasteSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
  • StreamOperations<String, String, MeiceUser> streamOperations = this.();
  • if ((streamKey)) {
  • StreamInfo.XInfoGroups groups = (streamKey);
  • if (()) {
  • creatGroup(streamKey, group);
  • } else {
  • ().forEach(g -> {
  • ("XInfoGroups:{}", g);
  • StreamInfo.XInfoConsumers consumers = (streamKey, ());
  • ("XInfoConsumers:{}", consumers);
  • });
  • }
  • } else {
  • creatGroup(streamKey, group);
  • }
  • <String, ObjectRecord<String, MeiceUser>> options =
  • ()
  • .batchSize(10)
  • .serializer(new StringRedisSerializer())
  • .executor(forkJoinPool)
  • .pollTimeout()
  • .targetType()
  • .build();
  • StreamMessageListenerContainer<String, ObjectRecord<String, MeiceUser>> listenerContainer = (factory, options);
  • StreamOffset<String> streamOffset = (streamKey, ());
  • Consumer consumer = (group, consumerName);
  • Subscription subscription = (consumer, streamOffset, streamListener);
  • ();
  • this.(listenerContainer);
  • return subscription;
  • }
  • @Override
  • public void destroy() {
  • this.(StreamMessageListenerContainer::stop);
  • }
  • }