Netty 实现WebSocket 服务
在使用 Netty 实现 WebSocket 服务时,我们需要几个关键步骤来确保服务的正确运行和优化性能。首先,我们需要设置 Netty 服务器,并配置相应的通道初始化器来处理 WebSocket 请求。其次,我们需要实现 WebSocket 处理器来管理连接、消息和关闭事件。
设置 Netty 服务器 :启动一个 Netty 服务器,监听特定端口并等待 WebSocket 客户端的连接。
配置通道初始化器 :在通道初始化器中添加必要的处理器,例如 HttpServerCodec
、HttpObjectAggregator
和 WebSocketServerProtocolHandler
。
实现 WebSocket 处理器 :编写自定义处理器来处理 WebSocket 的连接、消息和断开事件。确保处理器能够处理文本消息、二进制消息以及心跳检测。
实现WebSocket握手鉴权 :实际情况服务端需要对客户端的握手请求进行相关判断策略验证,通过才可以连接成功,不通过就认为是非法链接拒绝链接。
通过以上步骤,我们可以使用 Netty 搭建一个高效的 WebSocket 服务,支持实时通信需求。
在工作中通常是使用的是将Netty服务当作一个bean交给了spring管理。下面我们开始数据搭建过程
一、设置Netty服务器
Menservants 类是spring的一个bean 在初始化的时候,和销毁时自动对WebSocket服务进行启动和关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Component public class CAgentServer { private static Logger log = LoggerFactory.getLogger(CAgentServer.class); @Value("${server.cagent.listener.port}") private int port = 8888 ; @Autowired private CAgentServerInitializer serverInitializer; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ChannelFuture channelFuture; @PostConstruct public void run () throws Exception { log.info("Before run server" ); bossGroup = new NioEventLoopGroup (); workerGroup = new NioEventLoopGroup (512 ); ServerBootstrap b = new ServerBootstrap (); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(serverInitializer); b.option(ChannelOption.SO_BACKLOG, 128 ); b.childOption(ChannelOption.SO_KEEPALIVE, true ); channelFuture = b.bind(port).sync(); } @PreDestroy public void destory () throws Exception { try { channelFuture.channel().close().sync(); log.info("after stop server" ); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
这一部分代码基本上没什么变化 ,b.childHandler(serverInitializer);
这里设置了一个通道初始化器,这里包含了我们对通道的所有处理逻辑。
二、配置通道初始化器
ChannelInitializer 类用于设置 Netty 处理管道,包括处理 HTTP 请求和 WebSocket 协议的处理器。以下是一个示例实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Component public class CAgentServerInitializer extends ChannelInitializer <SocketChannel> { private static final Logger log = LoggerFactory.getLogger(CAgentServerInitializer.class); @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new HttpObjectAggregator (65536 )); pipeline.addLast(new WebSocketServerProtocolHandler ("/ws" )); pipeline.addLast(new CAgentWebSocketHandler ()); log.info("WebSocket channel initialized" ); } }
通过以上步骤,我们可以确保 Netty 服务器能够正确处理 WebSocket 请求,并且可以在必要的情况下进行握手鉴权。
三、自定义 WebSocket 处理器
自定义的 WebSocket 处理器 CAgentWebSocketHandler
用于处理连接成功事件、消息事件和断开事件。以下是一个示例实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CAgentWebSocketHandler extends SimpleChannelInboundHandler <TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(CAgentWebSocketHandler.class); @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { log.info("Client connected: {}" , ctx.channel().id().asLongText()); } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { log.info("Client disconnected: {}" , ctx.channel().id().asLongText()); } @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String request = msg.text(); log.info("Received message: {}" , request); ctx.channel().writeAndFlush(new TextWebSocketFrame ("Server received your message: " + request)); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("Error occurred: " , cause); ctx.close(); } }
在这个处理器中,我们实现了以下方法:
handlerAdded(ChannelHandlerContext ctx)
:当客户端成功连接时调用。
handlerRemoved(ChannelHandlerContext ctx)
:当客户端断开连接时调用。
channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
:当服务器接收到客户端发送的消息时调用。
exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
:当发生异常时调用,记录错误并关闭连接。
3.1 ChannelHandler的生命周期与事件处理机制
概述
Netty的ChannelHandler
是处理网络事件(如数据读取、数据写入、连接建立、连接关闭等)的核心组件。
在Netty中,ChannelHandler
的生命周期与Channel
的状态紧密相关,主要涉及到以下几个阶段:
初始化(Initialization) :
handlerAdded
方法被调用,这通常发生在ChannelPipeline
初始化时,表示一个新的ChannelHandler
被加入到ChannelPipeline
中。
注册(Registration) :
channelRegistered
方法被调用,这表示Channel
已经成功注册到它的EventLoop
上。
激活(Activation) :
channelActive
方法被调用,表示Channel
已经成功激活,可以开始接收和发送数据。
读取数据(Read) :
channelRead
方法被调用,这表示从Channel
中读取到了数据。
读完成(Read Complete) :
channelReadComplete
方法被调用,这表示一次读取操作完成。
关闭(Deactivation) :
channelInactive
方法被调用,表示Channel
与远端主机失去了连接,变成了非激活状态。
注销(Deregistration) :
channelUnregistered
方法被调用,表示Channel
从它的EventLoop
上注销。
移除(Removal) :
handlerRemoved
方法被调用,表示ChannelHandler
被从ChannelPipeline
中移除。
这些方法的调用顺序与Channel
的状态转换顺序相对应,形成了一个完整的生命周期。在实际应用中,根据不同的需求,开发者可以重写这些方法来实现自定义的逻辑处理,比如处理超时、心跳保活、数据编解码等。
💡 常见用法
handlerAdded() 与 handlerRemoved()可以用在一些资源的申请和释放
channelActive() 与 channelInActive()可以统计单机的连接数,channelActive() 被调用,连接数加一,channelInActive() 被调用,连接数减一。channelActive() 还可以实现ip黑白名单的过滤
channelRead()用来拆包读取信息
channelReadComplete()实现批量刷新的机制,这样channelRead()中只使用write() 方法而不用writeAndFlush()每次都刷新写入到缓存,从而提高性能。
生命周期Handler Dem
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 package com.artisan.reconnect;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered (ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered: channel注册到NioEventLoop" ); super .channelRegistered(ctx); } @Override public void channelUnregistered (ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered: channel取消和NioEventLoop的绑定" ); super .channelUnregistered(ctx); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive: channel准备就绪" ); super .channelActive(ctx); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive: channel被关闭" ); super .channelInactive(ctx); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead: channel中有可读的数据" ); super .channelRead(ctx, msg); } @Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete: channel读数据完成" ); super .channelReadComplete(ctx); } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded: handler被添加到channel的pipeline" ); super .handlerAdded(ctx); } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved: handler从channel的pipeline中移除" ); super .handlerRemoved(ctx); } }
四、心跳超时剔除
为了确保 WebSocket 连接的稳定性和及时释放资源,我们可以在服务器中实现心跳检测机制,并在连接超时时剔除不活跃的连接。以下是一个实现心跳检测的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Component public class CAgentServerInitializer extends ChannelInitializer <SocketChannel> { private static final Logger log = LoggerFactory.getLogger(CAgentServerInitializer.class); @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new HttpObjectAggregator (65536 )); pipeline.addLast(new WebSocketServerProtocolHandler ("/ws" )); pipeline.addLast(new IdleStateHandler (60 , 0 , 0 , TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler ()); pipeline.addLast(new CAgentWebSocketHandler ()); log.info("WebSocket channel initialized" ); } }public class HeartbeatHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class); @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("No heartbeat received from client, closing connection: {}" , ctx.channel().id().asLongText()); ctx.close(); } } else { super .userEventTriggered(ctx, evt); } } }
在此实现中,我们在 CAgentServerInitializer
中添加了 IdleStateHandler
和 HeartbeatHandler
两个处理器:
IdleStateHandler
:用于检测连接的空闲状态。如果在指定时间内(如60秒)没有接收到客户端的任何数据,该处理器会触发 IdleStateEvent
事件。
HeartbeatHandler
:继承自 ChannelInboundHandlerAdapter
,用于处理 IdleStateEvent
事件。如果检测到读取空闲状态(即超过指定时间没有接收到客户端的心跳包),则关闭该连接。
通过这种方式,我们可以确保 WebSocket 服务能够及时剔除不活跃的连接,保持连接的健康状态。
另外一种操作,阅读下源码
IdleStateHandler
内有三个内部类, ReaderIdleTimeoutTask
,WriterIdleTimeoutTask ,AllIdleTimeoutTask
里面都会调用 channelIdle 方法 也就是出现超时事件时 都会执行这个方法
1 2 3 4 5 6 7 protected void channelIdle (ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { ctx.fireUserEventTriggered(evt); }
如果出现超时直接关闭channel 其实也可以写一个类继承 IdleStateHandler
直接重写 channelIdle
然后加入判断pipeline内,逻辑就是对应的事件处理,这样也可以做到,而且方法的参数就是IdleStateEvent 不用判断类型,也算是一个骚操作吧 不过这样不是官方设计的用法。官方推荐第一种用法
1 2 3 4 5 6 7 8 @Override protected void channelIdle (ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt.state() == IdleState.READER_IDLE) { log.warn("no data received after 60s, channel=" + ctx.channel().remoteAddress().toString() + " will close" ); ctx.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 * <pre> * * * * * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> { * {@code @Override } * public void initChannel ({@link Channel} channel) { * channel.pipeline().addLast("idleStateHandler" , new {@link IdleStateHandler}(60 , 30 , 0 )); * channel.pipeline().addLast("myHandler" , new MyHandler ()); * } * } * * * public class MyHandler extends {@link ChannelDuplexHandler} { * {@code @Override } * public void userEventTriggered ({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} { * if (evt instanceof {@link IdleStateEvent}) { * {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt; * if (e.state() == {@link IdleState}.READER_IDLE) { * ctx.close(); * } else if (e.state() == {@link IdleState}.WRITER_IDLE) { * ctx.writeAndFlush(new PingMessage ()); * } * } * } * }
相关链接
https://blog.csdn.net/weixin_43935927/article/details/112001309
https://blog.csdn.net/m0_60259116/article/details/137680824
https://blog.csdn.net/RisenMyth/article/details/104441155