Netty搭建WebSocket服务端

2023-11-09

Netty服务端

1.引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version> <!-- 我这里用的1.5.9 -->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.blaze</groupId>
    <artifactId>netty-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>netty-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>

        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.50</version>
        </dependency>

        <!--netty依赖-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.43.Final</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.blaze.nettydemo.server.WebSocketServer</mainClass>
                </configuration>
            </plugin>
        </plugins>
        <finalName>netty-server</finalName>
    </build>

</project>

2.服务端

WebSocketServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

/**
 * create by zy 2019/11/28 14:50
 * TODO
 */
public final class WebSocketServer {

    //static final boolean SSL = System.getProperty("ssl") != null;
    //static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8888"));
    static final boolean SSL = false;
    static final int PORT = 8888;

    public static void main(String[] args) throws Exception {
        // Configure SSL. 配置 SSL
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
        /**
         * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
         * 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组
         * bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new WebSocketServerInitializer(sslCtx));

            //服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成,服务开启
            Channel ch = b.bind(PORT).sync().channel();
            System.out.println("服务已开启,等待客户端连接......");

            //下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //退出 释放资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

WebSocketServerInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslContext;

/**
 * create by zy 2019/11/28 14:53
 * TODO
 */
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    private static final String WEBSOCKET_PATH = "/";

    private final SslContext sslCtx;

    public WebSocketServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc())); // 设置 https 相关
        }
        pipeline.addLast(new HttpServerCodec()); // http 编码
        pipeline.addLast(new HttpObjectAggregator(65536)); // http 消息聚合器
        pipeline.addLast(new WebSocketServerCompressionHandler()); // 压缩 可以不设置
        pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true)); // 协议
        pipeline.addLast(new WebSocketFrameHandler()); // 处理WebSocketFrame
    }
}

WebSocketFrameHandler

import com.alibaba.fastjson.JSON;
import com.rising.netty.model.RequestModel;
import com.rising.netty.model.ResultModel;
import com.rising.netty.util.MQUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;

/**
 * create by zy 2019/11/28 14:57
 * TODO
 */
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        if (frame instanceof TextWebSocketFrame) {
            String request = ((TextWebSocketFrame) frame).text();
            System.out.println("接收消息:" + request);

            String msg = "接收成功";
            //返回信息
            ctx.channel().writeAndFlush(new TextWebSocketFrame(msg));
        } else if (frame instanceof BinaryWebSocketFrame) {
            //二进制
            ByteBuf content = frame.content();
            byte[] reg = new byte[content.readableBytes()];
            content.readBytes(reg);
            String request = new String(reg, "UTF-8");
            System.out.println("接收消息:" + request);

            String msg = "接收成功";
            //返回信息
            ByteBuf respByteBuf = Unpooled.copiedBuffer(msg.getBytes());
            ctx.channel().writeAndFlush(new BinaryWebSocketFrame(respByteBuf));
        } else {
            String message = "unsupported frame type: " + frame.getClass().getName();
            throw new UnsupportedOperationException(message);
        }
    }
}

3.客户端

WebSocketClient

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;

/**
 * create by zy 2019/11/28 14:57
 * TODO
 */
public final class WebSocketClient {

    static final String URL = System.getProperty("url", "ws://127.0.0.1:8888/");

    public static void main(String[] args) throws Exception {
        URI uri = new URI(URL);
        String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();

        final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
        final int port;
        if (uri.getPort() == -1) {
            if ("ws".equalsIgnoreCase(scheme)) {
                port = 80;
            } else if ("wss".equalsIgnoreCase(scheme)) {
                port = 443;
            } else {
                port = -1;
            }
        } else {
            port = uri.getPort();
        }

        if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
            System.err.println("Only WS(S) is supported.");
            return;
        }

        final boolean ssl = "wss".equalsIgnoreCase(scheme);
        final SslContext sslCtx;
        if (ssl) {
            sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        //配置客户端 NIO 线程组/池
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
            // If you change it to V00, ping is not supported and remember to change
            // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
            final WebSocketClientHandler handler = new WebSocketClientHandler(
                    WebSocketClientHandshakerFactory.newHandshaker(
                            uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));

            /**
             * Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap
             * 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel
             * 然后为其添加 Handler,这里直接使用匿名内部类,实现 initChannel 方法
             * 作用是当创建 NioSocketChannel 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件
             */
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                            }
                            p.addLast(
                                    new HttpClientCodec(),
                                    new HttpObjectAggregator(8192),
                                    WebSocketClientCompressionHandler.INSTANCE,
                                    handler);
                        }
                    });

            //客户端与服务端建立连接
            Channel ch = b.connect(uri.getHost(), port).sync().channel();
            handler.handshakeFuture().sync();


            /**
             * 将输入信息传输到 server 端
             */
            BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String msg = console.readLine();
                if (msg == null) {
                    break;
                } else if ("bye".equals(msg.toLowerCase())) {
                    //输入bye 断开连接
                    ch.writeAndFlush(new CloseWebSocketFrame());
                    ch.closeFuture().sync();
                    break;
                } else if ("ping".equals(msg.toLowerCase())) {
                    WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1}));
                    ch.writeAndFlush(frame);
                } else {
                    WebSocketFrame frame = new TextWebSocketFrame(msg);
                    ch.writeAndFlush(frame);
                }
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}

WebSocketClientHandler

import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

/**
 * create by zy 2019/11/28 14:58
 * TODO
 */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private final WebSocketClientHandshaker handshaker;
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("WebSocket Client disconnected!");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handshaker.isHandshakeComplete()) {
            try {
                //握手成功 建立连接
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                System.out.println("WebSocket Client connected!");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                //握手失败
                System.out.println("WebSocket Client failed to connect");
                handshakeFuture.setFailure(e);
            }
            return;
        }

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            //接收客户端返回消息
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            System.out.println("WebSocket Client received message: " + textFrame.text());
        } else if (frame instanceof PongWebSocketFrame) {
            System.out.println("WebSocket Client received pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            System.out.println("WebSocket Client received closing");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

4.web客户端

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
        if(window.WebSocket){
            socket = new WebSocket("ws://127.0.0.1:9898/");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseText');
                ta.value += event.data+"\r\n";
            };
            socket.onopen = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。连接  \r\n";
                login();
            };
            socket.onclose = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。关闭 \r\n";
            };
        }else{
            alert("您的浏览器不支持WebSocket协议!");
        }
        function send(msg){
            if(!window.WebSocket){return;}
            if(socket.readyState == WebSocket.OPEN){
                socket.send(msg);
            }else{
                alert("WebSocket 连接没有建立成功!");
            }
        }
        function login(){
            if(!window.WebSocket){return;}
            if(socket.readyState == WebSocket.OPEN){
                socket.send("建立连接成功!");
            }else{
                alert("WebSocket 连接没有建立成功!");
            }
        }
        function closeSocket(){
            if(!window.WebSocket){return;}
            socket.close();
        }
    </script>
</head>
<body>
<form onSubmit="return false;">   
    <label>TEXT</label><input type="text" name="blaze" value="" /> <br />
    <br /> <input type="button" value="发送ws消息"
                  onClick="send(this.form.blaze.value)" />
    <hr color="black" />
    <br /> <input type="button" value="断开连接"
                  onClick="closeSocket()" />
    <hr color="black" />
    <h3>服务端返回的应答消息</h3>
    <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty搭建WebSocket服务端 的相关文章

随机推荐

  • 在IBM p6 570 LPAR之间动态切换磁盘机/光驱

    小机上的一些外设比如磁盘机和光驱平时用的不多 所以大多都是在一台小机的各LPAR之间共享使用的 这些IO设备在不同的LPAR之间使用时 只能被一个LPAR独占 所以必要的时候就必须要做切换 客户的一台p6 570 里面做了4个LPAR 需要
  • 回顾篇-mysql索引-读书笔记

    事务日志 事务日志可以帮助提高事务的效率 使用事务日志 存储引擎在修改表的数据时只需要修改其内存拷贝 再把该修改行为记录到持久在硬盘上的事务日志中 而不用每次都将修改的数据本身持久到磁盘 事务日志采用的是追加的方式 因此写日志的操作是磁盘上
  • STM32学习---时钟系统

    1 时钟树 STM32的时钟系统比较复杂 我们主要通过时钟树来了解单片机内部的时钟配置情况 时钟树可以从开发指南中找到 以f1为例 学习一下他的树 明确几个缩写定义 AHB 先进高速总线 APB1 先进设备总线1 APB2 先进设备总线2
  • ORM总结(单表,一对多,多对多)

    一 表记录的增删改查 单表操作 1 添加 时间的格式必须写成YYYY MM DD 2 删除 filter筛选多条记录 返回的是QuerySet集合对象 3 修改 这三种都是类 objects 4 查询 values是具体拿一个字段 不再拿整
  • Linux内核memcpy的不同实现

    目录 1 概述 2 高级SIMD和浮点寄存器介绍 2 NEON指令 2 1 VLDR 2 2 VLDM 2 3 VSTR 2 4 VSTM 3 ARM架构程序调用寄存器使用规则 3 1 ARM寄存器使用规则 3 2 NEON寄存器使用规则
  • 【Python】range函数

    range函数 Python3 range 函数返回的是一个可迭代对象 类型是对象 而不是列表类型 所以打印的时候不会打印列表 res range 6 print res gt gt gt range 0 6 打印出来的不是列表 Pytho
  • 2.1 主窗口

    Qt用QMainWindow和相关的类来管理主窗口 QMainWindow继承自QWidget类 以下介绍几种常用操作 1 close 关闭当前窗口 2 hide 隐藏当前窗口 相当于 setVisible false 设置窗口可见或是不可
  • CocosCreator3.0加载远程图片资源

    在微信小游戏平台 需要获取了微信头像 对于这个需求 需要这样来做 获取微信用户信息 得到微信小游戏头像的http地址 在Cocos引擎使用loadRemote来加载 这其中的问题在于 使用loadRemote加载时获得的对象和2 x的版本不
  • redis服务停止(NOAUTH Authentication required)问题处理

    redis服务停止报NOAUTH Authentication required错误 处理方法 命令处理 redis cli a 密码 p 6379 shutdown 脚本处理 进入脚本文件 stop命令增加密码 完整配置文件 bin ba
  • 【统计模型】生存分析基本知识介绍

    目录 一 生存分析介绍 1 生存分析用途 2 传统方法在分析随访资料时的困难 1 生存时间和生存结局都是我们关心的因素 2 存在大量失访 3 显然 将失访数据无论是算作死亡还是存活都不合理 3 生存分析的优劣势 1 优势 2 劣势 4 生存
  • 机器学习经典算法,原理及代码实现

    机器学习知识体系 岭回归和LASSO回归 朴素贝叶斯 支持向量机 Logistic回归 K 近邻算法 线性回归 决策树 EM最大期望算法 Apriori算法 自适应增强 Adaboost 算法 PageRank算法
  • java.lang.ClassCastException: com.sun.proxy.$Proxy0 cannot be cast to java.sql.Connection异常问题解决

    Connection proxy Connection Proxy newProxyInstance Connection class getClassLoader Connection class getInterfaces new In
  • 数据结构——线性结构(7)——链队列的实现

    链队列的实现 头文件 这部分文件实现我们之前所使用的queue类 它主要的原理为 后进后出 LILO ifndef Queue h define Queue h 类型 Queue
  • 使用vue-video-player,播放rtmp直播流

    可直接在新的页面复制使用 测试可用
  • 对cpu与load的理解及线上问题处理思路解读

    前言 2019双11还有不到2个月就要到来了 大家也都知道服务器在大促期间由于流量的增加势必导致机器的cpu与load变高 因此趁着这个时机正好再好好学习 巩固一下cpu和load的概念 为双11做准备的同时也是增加自己的技能储备 不过cp
  • 华为OD机试真题- 宜居星球改造计划-2023年OD统一考试(B卷)

    题目描述 2XXX年 人类通过对火星的大气进行宜居改造分析 使得火星已在理论上具备人类宜居的条件 由于技术原因 无法一次性将火星大气全部改造 只能通过局部处理形式 假设将火星待改造的区域为row column的网格 每个网格有3个值 宜居区
  • Unity Shader入门精要第七章 基础纹理渐变纹理

    Unity系列文章目录 文章目录 Unity系列文章目录 前言 一 渐变纹理是什么 参考 前言 尽管在一开始 我们在渲染中使用纹理是为了定义一个物体的颜色 但后来人们发现 纹理 其实可以用于存储任何表面属性 一种常见的用法就是使用渐变纹理来
  • 静态链表的基础操作(详解)

    目录 一 闵版 1 完整代码 2 运行结果 二 钦版 1 结构体的创建 2 静态链表的初始化 3 尾插法 4 按值插入 5 删除元素 6 打印静态链表 7 摧毁链表 8 完整代码 9 运行结果展示 一 闵版 1 完整代码 include
  • Flutter视频播放、Flutter VideoPlayer 视频播详解

    1 添加依赖 视频播放 video player 1 0 1 2 播放视频前的准备 2 1 网络访问权限 在 ios 目录下的 info plist 清单文件中配置 iOS设置的http网络访问权限
  • Netty搭建WebSocket服务端

    Netty服务端 1 引入依赖