package ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
import .*;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* Using redis native message queue implementation
* Copyright © 2021 meicet. All rights reserved.
* @author zyx
* @date 2021-06-21 09:49:11
*/
public class RedisConsumerConfig implements DisposableBean {
private ApplicationContext context;
private StringRedisTemplate stringRedisTemplate;
private Vector<StreamMessageListenerContainer<String, ObjectRecord<String, MeiceUser>>> containerList = new Vector<>();
ForkJoinPool forkJoinPool;
RedisConnectionFactory factory;
public ExecutorService forkJoinPool() {
return new ForkJoinPool();
}
public void initRedisStream() throws Exception {
Map<String, Object> beansWithAnnotation = ();
if (() == 0) {
return;
}
for (Object item : ()) {
if (!(item instanceof StreamListener)) {
continue;
}
Method method = ().getDeclaredMethod("onMessage", );
MeiceRedisStreamListener annotation = ();
if (annotation == null) {
continue;
}
creasteSubscription(factory, (StreamListener) item, (), (), ());
}
}
private void creatGroup(String key, String group) {
StreamOperations<String, String, MeiceUser> streamOperations = this.();
String groupName = (key, group);
("creatGroup:{}", groupName);
}
private Subscription creasteSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
StreamOperations<String, String, MeiceUser> streamOperations = this.();
if ((streamKey)) {
StreamInfo.XInfoGroups groups = (streamKey);
if (()) {
creatGroup(streamKey, group);
} else {
().forEach(g -> {
("XInfoGroups:{}", g);
StreamInfo.XInfoConsumers consumers = (streamKey, ());
("XInfoConsumers:{}", consumers);
});
}
} else {
creatGroup(streamKey, group);
}
<String, ObjectRecord<String, MeiceUser>> options =
()
.batchSize(10)
.serializer(new StringRedisSerializer())
.executor(forkJoinPool)
.pollTimeout()
.targetType()
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, MeiceUser>> listenerContainer = (factory, options);
StreamOffset<String> streamOffset = (streamKey, ());
Consumer consumer = (group, consumerName);
Subscription subscription = (consumer, streamOffset, streamListener);
();
this.(listenerContainer);
return subscription;
}
public void destroy() {
this.(StreamMessageListenerContainer::stop);
}
}