/**
* History dialogue record; sessionId---> History record
*/
private static final ConcurrentHashMap<String, List<Message>> history = new ConcurrentHashMap<>();
@Override
public void chat(ChatMessageRequest msg, Principal principal) throws NoApiKeyException, InputRequiredException {
String sessionId = msg.getSessionId();
//The message sent by the user is in the database
CompletableFuture.runAsync(() -> {
saveMsg(msg.getContent(), sessionId, Role.USER, getLocalDate());
});
Message message = Message.builder().role(Role.USER.getValue()).content(msg.getContent()).build();
// Create a QwenParam object and set parameters
GenerationParam param = GenerationParam.builder()
.model(module) // Model version qwen-max
.messages(getHistory(sessionId)) // Message content, if you need to enable Doron continuous dialogue, put the user's historical messages and GPT replies together.
.resultFormat(GenerationParam.ResultFormat.MESSAGE)
.topP(0.8)
.enableSearch(true)
.apiKey(apiKey) // Your apiKey needs to be applied to Alibaba Cloud Bailian official website
.incrementalOutput(true)
.build();
// Call the generation interface to obtain the Flowable object
Flux<GenerationResult> result = Flux.from(gen.streamCall(param));
StringBuffer builder = new StringBuffer();
DateTime finalLocalTime = getLocalDate();
Flux.from(result)
// Control the transmission frequency
.delayElements(Duration.ofMillis(200)).doOnNext(res -> {
String output = res.getOutput().getChoices().get(0).getMessage().getContent();
if (output == null || "".equals(output)) {
return;
}
// Send the generated message to the front-end through websocket, and the websocket content will be introduced in the next article
sendMsg(output, sessionId, principal);
builder.append(output);
}).doFinally(signalType -> {
//The message is sent to the front end and tell the front end
sendMsg("!$$---END---$$!", sessionId, principal);
//Messages are in the database
CompletableFuture.runAsync(() -> {
saveMsg(builder.toString(), sessionId, Role.ASSISTANT, finalLocalTime);
buildHistory(sessionId,
Message.builder().role(Role.ASSISTANT.getValue()).content(builder.toString()));
});
}).onErrorResume(str -> {
if (str instanceof ApiException) {
ApiException exception = (ApiException) str;
log.error("Error occurred in interface call: {}", exception.getMessage());
}
sendMsg("The GPT interface call error occurs, and this function cannot be used for the time being, so stay tuned.", sessionId, principal);
return Mono.empty();
}).subscribeOn(Schedulers.boundedElastic()) // Execute in elastic thread pool
.subscribe();
}
/**
* Automatically clean the historical dialogue cache in the early morning of every day to prevent excessive cache
*/
@Scheduled(cron = "0 59 23 * * ?")
private void autoCleanHistory() {
history.clear();
}
/**
* Build historical news
*/
private void buildHistory(String sessionId, MessageBuilder<?, ?> message) {
List<Message> historyMessages = history.computeIfAbsent(sessionId, k -> {
List<ChatMessageVO> list = sessionService.getById(sessionId).getMessages();
List<Message> getMsgList = new ArrayList<>();
if (list.isEmpty()) return getMsgList;
MessageBuilder<?, ?> msg = Message.builder();
//Only the next 60 pieces are taken, there are too many historical messages, one is to consume tokens too quickly, and the other is to have too much pressure.
list.subList(Math.max(0, list.size() - 60), list.size()).forEach(item -> {
if (!"".equals(item.getContent())) {
msg.content(item.getContent()).role(item.getRole()).build();
getMsgList.add(msg.build());
}
});
return getMsgList;
});
// Add message to list
historyMessages.add(message.build());
history.remove(sessionId);
history.put(sessionId, historyMessages);
}
private List<Message> getHistory(String sessionId) {
List<Message> list = history.get(sessionId);
if (list == null || list.isEmpty()) {
return new ArrayList<>();
}
list.removeIf(item -> ("".equals(item.getContent())));
List<Message> hist = list.subList(Math.max(0, list.size() - 80), list.size());
history.remove(sessionId);
history.put(sessionId, hist);
return hist;
}