基于netty实现的websocket

发布时间:2019-10-08
技术:jdk1.8 + maven3.0.5 + netty

概述

Netty是一个高性能事件驱动、异步非阻塞的IO java开源框架,它由JBoss提供,用于建立TCP等底层的链接,基于Netty可以建立高性能的http服务器,快速开发高性能、高可靠的网络服务和客户端程序,它支持http,websocket,probuff,binary,tcp和udp,同时Netty又是基于NIO的客户端,服务器端编程框架,Netty可以快速开发出一个网络应用

详细

一、运行效果

image.png

image.png

二、Netty使用场景

1.高性能领域


2.多线程并发领域


3.异步通信领域


三、实现过程

①、添加netty相关jar

	<dependency>
	    <groupId>io.netty</groupId>
	    <artifactId>netty-all</artifactId>
	    <version>5.0.0.Alpha1</version>
	</dependency>


②、添加netty配置类

public class NettyConfig {
	
	/**
	 * 存储每一个客户端接入进来时的channel对象
	 */
	public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

③、添加websocket核心业务处理类

private WebSocketServerHandshaker handshaker;
	private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
	//客户端与服务端创建连接的时候调用
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		NettyConfig.group.add(ctx.channel());
		System.out.println("客户端与服务端连接开启...");
	}

	//客户端与服务端断开连接的时候调用
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		NettyConfig.group.remove(ctx.channel());
		System.out.println("客户端与服务端连接关闭...");
	}

	//服务端接收客户端发送过来的数据结束之后调用
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
			ctx.flush();
	}

	//工程出现异常的时候调用
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}

	//服务端处理客户端websocket请求的核心方法
	@Override
	protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
		//处理客户端向服务端发起http握手请求的业务
		if (msg instanceof FullHttpRequest) {
			handHttpRequest(context,  (FullHttpRequest)msg);
		}else if (msg instanceof WebSocketFrame) { //处理websocket连接业务
			handWebsocketFrame(context, (WebSocketFrame)msg);
		}
	}
	
	/**
	 * 处理客户端与服务端之前的websocket业务
	 * @param ctx
	 * @param frame
	 */
	private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
		//判断是否是关闭websocket的指令
		if (frame instanceof CloseWebSocketFrame) {
			handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
		}
		//判断是否是ping消息
		if (frame instanceof PingWebSocketFrame) {
			ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
			return;
		}
		
		//判断是否是二进制消息,如果是二进制消息,抛出异常
		if( ! (frame instanceof TextWebSocketFrame) ){
			System.out.println("目前我们不支持二进制消息");
			throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息");
		}
		//返回应答消息
		//获取客户端向服务端发送的消息
		String request = ((TextWebSocketFrame) frame).text();
		System.out.println("服务端收到客户端的消息====>>>" + request);
		TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() 
																								+ ctx.channel().id() 
																								+ " ===>>> " 
																								+ request);
		//群发,服务端向每个连接上来的客户端群发消息
		NettyConfig.group.writeAndFlush(tws);
	}

④、websocket初始化时各个组件的类实现

public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel e) throws Exception {
		e.pipeline().addLast("http-codec", new HttpServerCodec());
		e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
		e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
		e.pipeline().addLast("handler", new MyWebSocketHandler());
	}

}

四、项目结构图

image.png



五、补充

image.png

本实例支付的费用只是购买源码的费用,如有疑问欢迎在文末留言交流,如需作者在线代码指导、定制等,在作者开启付费服务后,可以点击“购买服务”进行实时联系,请知悉,谢谢
手机上随时阅读、收藏该文章 ?请扫下方二维码