web123456

Spring Boot and Netty create TCP server (solve sticky packet problems)

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; /** * I/O data read and write processing class * * @author xiaobo */ @Slf4j public class CarTcpNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter { /** * When new data is received from the client, this method will be called when the message is received. * * @param ctx * @param msg */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { // Here is the DelimiterBasedFrameDecoder in the previous section to be converted to ByteBuf to verify whether it is ByteBuf if (msg instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) msg; try { String receivedData = byteBuf.toString(CharsetUtil.UTF_8); // Receive complete data handleReceivedData(receivedData); } finally { // Release resources occupied by ByteBuf byteBuf.release(); // Reply to the message ctx.writeAndFlush(Unpooled.copiedBuffer("Received over", CharsetUtil.UTF_8)); } } } private void handleReceivedData(String receivedData) { // Data processing // If you want to implement bean injection in spring, you can use geBean to obtain it } /** * Called when new data is received from the client and read is completed * * @param ctx */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { log.info("channelReadComplete"); ctx.flush(); } /** * When a Throwable object appears, it will be called, that is, when Netty is thrown due to an IO error or an exception thrown by the processor when processing an event * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException { cause.printStackTrace(); ctx.close();// Throw an exception and disconnect from the client } /** * Execute when the client and the server establish a connection for the first time * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException { super.channelActive(ctx); ctx.channel().read(); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); // Cannot use() here, otherwise the client will never be able to establish a connection with the server System.out.println("channelActive:" + clientIp + ctx.name()); // Here is a response to the client ctx.writeAndFlush(Unpooled.copiedBuffer("Received over", CharsetUtil.UTF_8)); } /** * When the client and the server are disconnected, execute * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException { super.channelInactive(ctx); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); // When disconnecting, it must be closed, otherwise it will cause waste of resources and may cause downtime if the concurrency is large. ctx.close(); log.info("channelInactive:{}", clientIp); } /** * When the read timeouts on the server, this method will be called * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { super.userEventTriggered(ctx, evt); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); ctx.close();// Disconnect when timeout log.info("userEventTriggered:" + clientIp); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { log.info("channelWritabilityChanged"); } }