Netty源码解读

2023-05-16

Netty源码解读

Netty线程模型

在这里插入图片描述
1、定义了两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,Group中维护了多个事件循环线程NioEventLoop,每个NioEventLoop维护了一个Selector和TaskQueue
3、每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
3.1、处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
3.2、将NioSocketChannel注册到某个worker NIOEventLoop上的selector
3.3、runAllTasks处理任务队列TaskQueue的任务
4、 每个worker NioEventLoop线程循环执行的步骤
4.1、轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
4.2、处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
4.3、runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在pipeline中的流动处理
4.4、处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler处理器用来处理 channel 中的数据

Netty服务启动示例

// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                //对workerGroup的SocketChannel设置handler处理器
                ch.pipeline().addLast(new NettyServerHandler());
            }
        });
// 启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(9099).sync();

Netty源码分析

从bootstrap.bind作为入口分析启动流程,进入后可以看到会调用AbstractBootstrap#doBind,最终会调用initAndRegister()方法,主要逻辑都在前三步中实现,本次也主要分析这三个步骤

# AbstractBootstrap类
// 1、创建一个服务端Channel,即NioServerSocketChannel
channel = channelFactory.newChannel();
// 2、初始化NioServerSocketChannel,在pipeline中添加一些处理器hander
init(channel);
// 3、进行注册
ChannelFuture regFuture = config().group().register(channel);
// 把NioServerSocketChannel绑定到指定端口
channel.bind(localAddress, promise);

channelFactory.newChannel();

bootstrap.channel(NioServerSocketChannel.class) 会将serverChannel绑定到ReflectiveChannelFactory上

# AbstractBootstrap类

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

channelFactory.newChannel()会调用ReflectiveChannelFactory的newChannel方法,进而调用constructor.newInstance(),而该constructor正好是NioServerSocketChannel类;所以new的对象就是NioServerSocketChannel
服务端NioServerSocketChannel进行初始化
1、设置感兴趣事件为连接事件OP_ACCEPT
2、设置channel为非阻塞
3、初始化服务端pipeline

# NioServerSocketChannel类

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 将感兴趣的事件设置为连接事件OP_ACCEPT
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 父类初始化方法 ch 即为NioServerSocketChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    // 设置为非阻塞
    ch.configureBlocking(false);
}

// 父类的父类中初始化pipeline,此时只有HeadContext和TailContext
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

init(channel)

调用ServerBootstrap.init方法,向服务端NioServerSocketChannel的pipeline中添加hander处理器ChannelInitializer;此时服务端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrap 类

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    //向 pipeline中添加hander处理器ChannelInitializer
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

config().group().register(channel)

bootstrap.group(bossGroup, workerGroup)构造时,将group设置为bossGroup,childGroup设置为workerGroup; config().group().register(channel)会调用bossGroup的register方法,从bossGroup的MultithreadEventLoopGroup线程组中取一个线程SingleThreadEventLoop进行调用register方法

register注册逻辑

服务端的NioServerSocketChannel和客户端的NioSocketChannel都会调用此方法进行注册
1、服务启动时,NioServerSocketChannel注册到selector上,对客户端OP_ACCEPT操作感兴趣
2、当有客户端连接时,通过NioServerSocketChannel的accept()得到每个客户端的NioSocketChannel,将其注册到selector上,对客户端OP_READ操作感兴趣

# SingleThreadEventLoop extends SingleThreadEventExecutor 类

public ChannelFuture register(final ChannelPromise promise) {
    promise.channel().unsafe().register(this, promise);
    return promise;
}

调用AbstractChannel的register方法,创建一个注册的task交给EventLoop线程处理

# AbstractChannel 类

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    .......
    AbstractChannel.this.eventLoop = eventLoop;
    .......
    // 1、处理连接事件时,用的是bossGroup里的NioEventLoop
    // 2、处理读写事件时,用的是workGroup里的NioEventLoop
    eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
    });
}

private void register0(ChannelPromise promise) {
    doRegister();
    // 1、NioServerSocketChannel 处理逻辑
        // 调用NioServerSocketChannel服务端pipeline中hander的handlerAdded方法
        // 此时会调用到ChannelInitializer的handlerAdded,然后调用其initChannel,该方法中
        // 会向服务端pipeline中加入ServerBootstrapAcceptor
        // 调用服务端pipeline中hander的channelRegistered方法
        // 调用服务端pipeline中hander的ChannelActive方法
    // 2、NioSocketChannel 处理逻辑 调用我们自定义hander中的方法
        // 调用客户端pipeline中hander的handlerAdded方法
        // 调用客户端pipeline中hander的channelRegistered方法
        // 调用客户端pipeline中hander的ChannelActive方法,我们自定义hander的ChannelActive在此调用
    pipeline.invokeHandlerAddedIfNeeded();
    pipeline.fireChannelRegistered();
    pipeline.fireChannelActive();

}
// doRegister()逻辑由子类AbstractNioChannel实现
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 将channel注册到Selector上
            // 1、NioServerSocketChannel注册到Selector上
            // 2、NioSocketChannel注册到Selector上
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
        }
    }
}

eventLoop.execute就是调用SingleThreadEventExecutor#execute

# SingleThreadEventExecutor 类
@Override
public void execute(Runnable task) {
    // 将注册register0逻辑加入队列taskQueue
    addTask(task);
    // 开启线程循环监听事件,会调用SingleThreadEventExecutor.run方法
	// 最终调用子类NioEventLoop的run()方法
    startThread();
}

死循环执行 selector.select方法,直到监听到事件或者超时,才会执行processSelectedKeys逻辑
1、服务端启动后,NioServerSocketChannel若监听到客户端OP_ACCEPT操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环
2、客户端连接成功后,NioSocketChannel若监听到客户端OP_READ操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环

# NioEventLoop 类
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    ....
                    case SelectStrategy.SELECT:
                        // 该方法监听到事件(OP_ACCEPT|OP_READ)时才会返回
                        select(wakenUp.getAndSet(false));
                    default:
                }
            } catch (IOException e) {
            	.....
            }
            // 监听到事件执行
            try {
                // 1、获取SelectionKey处理事件
                processSelectedKeys();
            } finally {
                // 2、执行taskQueue中其他的注册方法register0
                runAllTasks();
            }
            
        }
   }
}   

private void select(boolean oldWakenUp) throws IOException {
    // 一直循环遍历
    int selectCnt = 0;
    for (;;) {
        // 根据注册的定时任务,获取本次select的阻塞时间
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    	// 没有监听到事件或没有超时,则一直阻塞(会让出cpu资源)
        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
	        // 正常场景
            // 当有连接|读写操作或者selector被唤醒了,则直接返回
            break;
        }

        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // 正常场景
            // 说明没有监听到事件,而是超时了,则重置selectCnt
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // 异常场景  select 空轮询bug修复
            // 若空轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD配置
            // 则关闭老的select,建立新的select
            selector = selectRebuildSelector(selectCnt);
            selectCnt = 1;
            break;
        }
        currentTimeNanos = time;
    }
}

private void processSelectedKeysOptimized() {
    // 遍历所有的selectedKeys进行处理
    for (int i = 0; i < selectedKeys.size; ++i) {
         processSelectedKey(k, (AbstractNioChannel) a);
    }
}
            
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        // 连接|读写操作会调用该方法
        // 1、连接操作调用NioMessageUnsafe的read方法
        // 2、读写操作调用NioByteUnsafe的read方法
        unsafe.read();
    }
}

OP_ACCEPT连接操作处理
1、为每个客户端创建NioSocketChannel,并进行初始化
1.1、设置感兴趣事件为OP_READ
1.2、设置channel为非阻塞
1.3、初始化客户端pipeline
2、调用服务端NioServerSocketChannel的pipeline,将客户端的NioSocketChannel作为参数传过去,最终会调用到ServerBootstrapAcceptor,将NioSocketChannel注册到workGroup上

# NioMessageUnsafe 类
public void read() {
    final ChannelPipeline pipeline = pipeline();
    // 创建每个客户端的NioSocketChannel
    doReadMessages(readBuf);
    int size = readBuf.size();
    // readBuf为NioSocketChannel,遍历客户端所有的NioSocketChannel
    // 执行服务端NioServerSocketChannel的pipeline,循环执行fireChannelRead,
    // 最终会调用服务端hander的ChannelRead方法,此处会调用到ServerBootstrapAcceptor的ChannelRead方法
    for (int i = 0; i < size; i ++) {
        readPending = false;
        pipeline.fireChannelRead(readBuf.get(i));
    }
    // 调用服务端pipeline的读完成方法
    pipeline.fireChannelReadComplete();

}

protected abstract int doReadMessages(List<Object> buf) throws Exception;
// 调用子类NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
    // 获取客户端的连接得到SocketChannel,每个客户端在服务端都有一个对应的SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // NioSocketChannel处理方式同NioServerSocketChannel
            // 1、设置感兴趣事件为连接事件OP_READ
            // 2、设置channel为非阻塞
            // 3、初始化客户端NioSocketChannel的pipeline
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {

    }
    return 0;
}

将我们自定义的hander添加到NioSocketChannel的pipeline上,然后将NioSocketChannel注册到workGroup上,此时客户端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrapAcceptor 类

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 传过来的NioSocketChannel
    final Channel child = (Channel) msg;
    // 将我们手动添加的Hander添加到pipeline
    child.pipeline().addLast(childHandler);
    try {
        // 将NioSocketChannel注册workGroup的一个线程的selector上,
        // 方式同NioServerSocketChannel,执行register注册逻辑
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                .....
            }
        });
    } catch (Throwable t) {
    }
}

OP_READ操作处理
进行数据读写,并执行pipeline中自定义的hander

# NioByteUnsafe类

// 接受到客户端OP_READ事件时调用
public void read() {
    // 获取客户端NioSocketChannel的pipeline
    final ChannelPipeline pipeline = pipeline();
    do {
        // 数据读写
        // 调用pipeline.fireChannelRead时会依次调用pipeline中hander的ChannelRead方法
        // 我们自定义的hander的ChannelRead方法就会在此处调用
        byteBuf = allocHandle.allocate(allocator);
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        pipeline.fireChannelRead(byteBuf);
    } while (allocHandle.continueReading());
    allocHandle.readComplete();
    // 调用pipeline的读完成方法
    pipeline.fireChannelReadComplete();
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty源码解读 的相关文章

随机推荐

  • Ubuntu 20.04 上安装 Node.js 和 npm 的三种方法

    主要介绍三种在 Ubuntu 20 04 上安装 Node js 和 npm 的方法 xff1a 通过Ubuntu标准软件库 这是最简单的安装方法 xff0c 并且适用大多数场景 但是标准软件库中最高版本只有 v10 19 0 root 6
  • android databinding 数据绑定错误 错误:任务':app:compileDebugJavaWithJavac' 的执行失败

    今天到公司照常打开项目 xff0c 突然运行不了显示databinding错误 Error Execution failed for task 39 app compileDebugJavaWithJavac 39 gt android d
  • 解决idea新建Module的奇怪路径问题

    问题由来 xff1a 在部署SpringCloud的时候想新建一个module来快速创建 xff0c 结果被创建出来的目录结构搞得一脸懵逼 xff0c 新建的module的根目录跑到了 xff0c 项目的src目录下 xff0c 整个看起来
  • ThingsBoard源码解析-数据订阅与规则链数据处理

    前言 结合本篇对规则链的执行过程进行探讨 根据之前对MQTT源码的学习 xff0c 我们由消息的处理入手 org thingsboard server transport mqtt MqttTransportHandler void pro
  • Thingsboard使用gateway网关

    简介 xff1a 本次是想测试一下thingsboard网关的使用 xff0c 实现通过网关 43 mqtt 43 thingsboard 43 emqx 实现间接设备创建和数据传输 前期准备 xff1a thingsboard平台 thi
  • Thingsboard(2.4 postgresql版)数据库表结构说明

    本文描述的表结构是根据thingsboard2 4 xff08 postgresql版 xff09 数据库中整理出来的 xff0c 不一定完整 xff0c 后续有新的发现再补充文档 一 数据库E R关系 Thingsboard2 4社区版共
  • ThingsBoard—自定义规则节点

    一般的功能 xff0c 可以使用现有的节点来完成 但如果有比较复杂 xff0c 或有自己特殊业务需求的 xff0c 可能就需要自定义了 按官方教程来基本就可以入门 xff0c 如果需要深入 xff0c 可以参考ThingsBoard自有节点
  • Thingsboard 报错 Cannot resolve symbol ‘TransportProtos‘

    本人idea 版本为 2021 1 xff0c 顺利编译 thingsboard 打开进行源码阅读时 xff0c 发现报 Cannot resolve symbol 39 TransportProtos 39 xff0c 如下图 xff1a
  • ThingsBoard 规则引擎-邮件通知

    之前我们已经学习了Thingsboard安装 设备接入 简单的数据可视化内容 xff0c 今天来继续学习下thingsboard其他特性 规则引擎 应用场景 ThingsBoard规则引擎是一个支持高度可定制复杂事件处理的框架 xff0c
  • ThingsBoard编译报错:Failure to find org.gradle:gradle-tooling-api:jar:6.3

    删除本地仓库未下载完成的缓存文件 xff08 删除像图片显示这样以 lastUpdated结尾的文件 xff09 执行mvn v确保maven命令可以正常执行执行下面命令 xff0c 将下载的jar安装到本地仓库 注意 xff1a 将 Df
  • Thingsboard3.4-OTA升级

    背景 在做设备端对接thingsboard平台得时候 xff0c 去研究设备端对接平台的过程中 xff0c 花了不少时间 xff0c 在此之前也没有找到相关的文档 xff0c 于是出于减少大家去研究的时间 xff0c 写了这篇博客 xff0
  • PyCharm更换pip源为国内源、模块安装、PyCharm依赖包导入导出教程

    一 更换pip为国内源 1 使用PyCharm创建一个工程 2 通过File gt Setting 选择解释器为本工程下的Python解释器 3 单击下图中添加 43 xff0c 4 单击下图中的 Manage Repositories 按
  • Pycharm没有找到manage repositories按钮解决方案

    问题描述 xff1a 不知道是因为版本原因还是其他 xff0c pycharm没有找到manage repositories按钮 xff0c 无法更改下载源 xff0c 导致安装库的速度会很慢 解决办法 xff1a 1 点击左下角的pyth
  • 通过改变JVM参数配置降低内存消耗

    有个项目 xff0c 其服务器端原本内存占用很大 xff0c 16G内存几乎都用光了 原先的JVM参数配置是这样的 xff1a Xms16384m Xmx16384m XX PermSize 61 64m XX MaxPermSize 61
  • NodeJS yarn 或 npm如何切换淘宝或国外镜像源

    一 查看当前的镜像源 npm config get registry 或 yarn config get registry 二 设置为淘宝镜像源 xff08 全局设置 xff09 npm config set registry https
  • Centos7 部署InfluxDB

    因为目前网络上关于InfluxDB的资料并不多 xff0c 所以这里建议多参考官网 官网 xff1a Home InfluxData 点击此处的Docs xff1a 这里选择 InfluxDB OSS xff1a 使用文档时根据需求选择查看
  • SpringBoot 集成 Emqx 发布/订阅数据

    最近项目中用到Emqx发布 订阅数据 xff0c 特此记录便于日后查阅 ThingsboardEmqxTransportApplication Copyright 2016 2023 The Thingsboard Authors lt p
  • Centos7部署Minio集群

    1 地址规划 minio1 span class token number 10 0 span 0 200 minio2 span class token number 10 0 span 0 201 minio3 span class t
  • Centos7 部署单机 Minio 对象存储服务

    MinIO 是一款基于 Go 语言发开的高性能 分布式的对象存储系统 xff0c 客户端支持 Java xff0c Net xff0c Python xff0c Javacript xff0c Golang语言 MinIO 的主要目标是作为
  • Netty源码解读

    Netty源码解读 Netty线程模型 1 定义了两组线程池BossGroup和WorkerGroup xff0c BossGroup专门负责接收客户端的连接 WorkerGroup专门负责网络的读写 2 BossGroup和WorkerG