开源库源码分析:Okhttp源码分析(一)

2023-10-27

开源库源码分析:OkHttp源码分析

在这里插入图片描述

导言

接下来就要开始分析一些常用开源库的源码了,作为最常用的网络请求库,OkHttp以其强大的功能深受Android开发者的喜爱(比如说我),还有对该库进行二次封装而成的热门库,比如说Retrofit。本文我们将从源码入手看看OkHttp是如何运作的。注意本文解析的是OkHttp3库,该库是用Kotlin写的,需要大家有一些Kotlin基础。

OkHttp的最佳使用

这个问题是在OkHttp3的OkHttpClient中的注释中发现的:
在这里插入图片描述
这段注释中提到了OkHttp最好是用单例的OkHttpClient来实现请求,我们可以对该单例进行复用。这是因为每一个OkHttpClient都会持有一个连接池和线程池,都称之为池了,那么其作用肯定就是为了复用。通过复用这些连接和线程我们可以显著地降低延迟和节约内存使用。

实际上这段注释中已经提到了OkHttp中的一些复用机制了,连接复用和线程复用。

调度类Dispatcher

调度类的线程池

OkHttp中核心类之一就是这个调度类Dispatcher,所有的网络请求类Call都需要通过这个调度类来执行任务。这个调度类是OkHttpClient类的成员变量,我们是在调度类中执行请求的,那我们就顺便再来介绍一下客户端类中的成员变量:

//调度类
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
//连接池
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
//应用拦截器
@get:JvmName("interceptors") val interceptors: List<Interceptor> = builder.interceptors.toImmutableList()
//网络拦截器
@get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()
//事件监听工厂
@get:JvmName("eventListenerFactory") val eventListenerFactory: EventListener.Factory =
      builder.eventListenerFactory

这几个应当是最核心的几个成员变量,我们将在后面再接触它们。回归正题,我们继续看调度类,既然要执行Call,那么这个调度器类就一定会有线程池,这个线程池有两个手段可以设置:

  private var executorServiceOrNull: ExecutorService? = null//线程池

  @get:Synchronized //在调用get时设置线程池为一个默认线程池,类似于一个CachedPool
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

  constructor(executorService: ExecutorService) : this() { //通过构造方法设置线程池
    this.executorServiceOrNull = executorService
  }

可以看到,方法一是调用构造函数来设置线程池,第二个方法就是在调用get方法时会自动设置一个效果类似于CachedPool的线程池。

调度类执行请求

我们使用OkHttp时一般是两种方式,同步请求execute异步请求enqueue,我们以异步请求为例来分析调度类是如何执行网络请求的。一般我们发送异步请求是这样的形式:

    public static void main(String[] args) {
        OkHttpClient client = new OkHttpClient();
        Request r = new Request.Builder().build();
        Call call = client.newCall(r);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(@NonNull Call call, @NonNull IOException e) {
                
            }

            @Override
            public void onResponse(@NonNull Call call, @NonNull Response response) throws IOException {

            }
        });
    }

首先通过RequestBuilder类来构建了一个请求类,然后将该请求传入newCall方法中新创建出了一个Call

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

可以看到返回的是RealCall类,该类实现了Call接口。接下来就从Call的enqueue方法入手,这里实际上是RealCall类中:

  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart() //开启事件监听
    client.dispatcher.enqueue(AsyncCall(responseCallback)) //通过调度类入队
  }

这其中callStart方法会开启事件监听,我们先不管这个事件监听,看最后一句通过调度器来入队,这个AsyncCall()类是异步请求类,实现了Runnable接口,这里实际上就是将我们传入的实现了Callback的回调给包装成了AsyncCall。我们继续看Dispatcher中是如何入队的:

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

首先为了确保线程安全,调度器先将其上锁然后将之前包装成的AsyncCall加入到了readyAsyncCall就绪队列中。中间这一段判断是用于处理与主机(host)相关的异步调用复用。如果 call 不是用于 WebSocket 调用,它会尝试查找是否已经存在一个与相同主机(host)相关的异步调用任务。如果找到了已经存在的异步调用,它会尝试共享这两个异步调用任务之间的状态。这是为了复用已经建立的连接,以提高性能和减少资源消耗。

最后会调用promoteAndExecute方法执行,这个方法是执行请求的核心之一:

 private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()//可执行的Calls
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()//获得一个迭代器
      while (i.hasNext()) { //开始迭代
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

这段代码的核心逻辑就是while循环之中,这很显然是一个通过迭代器开始遍历就绪队列readyAsyncCall的过程,它会将就绪队列中的call移入到可执行队列executableCalls和正在执行异步队列runningAsyncCalls中。其中还有两句判断条件:

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

一句是判断正在运行队列没有超过最大Size,一句是判断每一句的最大执行数不超过最大值,这两个值在Dispatcher中都有定义。最后会跳入到for循环中并调用AsyncCall的executeOn方法,并将线程池传入,也就是接下来就是在线程池中执行该异步请求了。我们最后来看这个executeOn方法:

fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

这个方法真是开门见山。在try语句的第一句处就通过线程池执行了该任务,还记得这个AsyncCall是由我们传入的Call回调包装而成吗?接下来就会回调到我们的函数之中去了。众所周知,由于executorService.execute(this)这一句,线程池接下来会执行AsyncCall的run方法了:

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain() //1--通过拦截器链来获得响应
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)//2--请求成功,回调到我们传入的onResponse方法中
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e) //3--请求失败,回调到我们传入的onFailure方法之中去
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)//结束本次请求
        }
      }
    }

我已经在代码之中加入一些注释了,在注释一处会通过拦截器链来获得响应,这个拦截器机制我们之后再将,这里只要知道它在这里获得了响应即可。在注释二处我们就可以看到这里显然是调用了我们传入的Call回调了,请求成功调用onResponse回调,要是请求失败的话则回调到onFailure方法之中去。最后在finally块中将本次请求结束,实际上这个finished方法还会开启下一次循环:

  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }
  
  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

由第一个函数会调转到第二个函数,从第二个函数来看,它首先会将本次的call从runningAsyncCalls正在运行队列中移除,然后会继续调用我们之前提到过的promoteAndExecute方法开启下一次循环,如果没有任务了的话并且空闲处理程序不为空的话,还会执行空闲处理程序。

拦截器链

之前提到网络请求的相应是通过拦截器链获得的:

val response = getResponseWithInterceptorChain()

首先,什么是拦截器呢?

在计算机编程领域,拦截器(Interceptor)是一种常见的设计模式,用于在软件系统的不同组件之间添加或修改功能。拦截器的主要目的是拦截请求或操作,允许在请求进入目标组件之前或之后执行自定义逻辑。

简单来说就是有许多个拦截器,一个请求的执行需要经过这一整条拦截器链,在一个请求到达一个拦截器的时候我们就可以判断是否要拦截这个请求并进行处理,简单来说就是类似于旅行途中的设置关口。Android中这种模式也运用,比如说广播View的事件分发和处理,这都是拦截器模式的运用。

好了,言归正传,我们继续回到上面的方法上:

internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>() //创建拦截器集合
    interceptors += client.interceptors //添加用户设置的应用拦截器
    interceptors += RetryAndFollowUpInterceptor(client) //负责重试和重定向 
    interceptors += BridgeInterceptor(client.cookieJar) //用于桥接应用层和网络层的请求数据
    interceptors += CacheInterceptor(client.cache) //用于处理缓存
    interceptors += ConnectInterceptor //网络连接拦截器,用于获取一个链接
    if (!forWebSocket) {
      //添加用户设置的网络拦截器
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)//用于请求网络并获取网络响应
     //创建责任链
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try { //启动责任链
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

具体代码的含义已经写在注释中了,首先要提醒大家上面的+=号是Kotlin的重载,就是相当于添加集合。其实把上面的方法切分一下的话主要就是做了两件事:创建和整理拦截器集合创建并启动责任链。下面是一张从官方文档中截取的图片

在这里插入图片描述
我们看右下部分就是说明网络响应是通过网络拦截器来层层向上传递的,并且拦截器是HTTP工作真正发生的地方。
下面再来介绍各个拦截器的作用:

  • interceptor:应用拦截器,通过Client设置
  • RetryAndFollowUpInterceptor:重试拦截器,负责网络请求中的重试重定向。比如网络请求过程中出现异常的时候就需要进行重试。
  • BridgeInterceptor:桥接拦截器,用于桥接应用层和网络层的数据。请求时将应用层的数据类型转化为网络层的数据类型,响应时则将网络层的数据类型转化为应用层的数据类型。
  • CacheInterceptor:缓存拦截器,负责读取和更新缓存。可以配置自定义的缓存拦截器。
  • ConnectInterceptor:网络连接拦截器,其内部会获取一个连接。
  • networkInterceptor:网络拦截器,通过Client设置。
  • CallServerInterceptor:请求服务拦截器。它是拦截器中处于末尾的拦截器,用于向服务端发送数据并获取响应

我们回到上面的方法中,在try块中的第一句就是启动责任链chain.proceed(),这个方法将会启动责任链并获取响应:

override fun proceed(request: Request): Response {
    check(index < interceptors.size)
	......
    // Call the next interceptor in the chain.
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")
	.......

    return response
  }

这里面的核心就是上面的这一小段,首先通过copy方法获得下一个拦截器:

internal fun copy(
    index: Int = this.index,
    exchange: Exchange? = this.exchange,
    request: Request = this.request,
    connectTimeoutMillis: Int = this.connectTimeoutMillis,
    readTimeoutMillis: Int = this.readTimeoutMillis,
    writeTimeoutMillis: Int = this.writeTimeoutMillis
  ) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,
      readTimeoutMillis, writeTimeoutMillis)

这个copy本质还是调用了构造函数,这这里它调用更改的唯一参数就是index参数,它将原来的index+1,显然是想要达到遍历的效果,那么这个index参数到底会影响什么呢?我们再将其与一开始的构造相比较:

     //创建责任链
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

一开始这个index的值是0,通过查找整个类可以发现唯一有意义的就是我们上面提到的proceed方法:

val interceptor = interceptors[index]
val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

就是用来从拦截器集合中获取拦截器的。之后又会用我们获取到的拦截器调用其intercept方法,并将之前copy出来的对象传进去(也就是新的责任链,只不过index向后指了一格)。这个intercept方法是一个接口方法,实现该接口的类有很多,我们以比较简单的ConnectInterceptor类为例:

  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }

这里它做的唯一就是初始化了一个exchange参数然后将其设置进了责任链中,最后又会调用新设置好的责任链的proceed方法中。这就又回到了之前的方法中。这样看来,这个责任链的大体行为模式还是很好懂的,首先启动责任链,责任链调用proceed方法启动,proceed方法中会获得下一个拦截器并且调用下一个拦截器中的intercept拦截方法,在这个拦截方法中首先会进行该拦截器的一些拦截逻辑,拦截逻辑完成之后会再次调用proceed方法继续获得下一个拦截器,然后再调用它的拦截器方法,以此类推直到整个拦截器链上的拦截器方法都执行一遍,最后返回出一个Response响应。整个过程差不多如下所示:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

开源库源码分析:Okhttp源码分析(一) 的相关文章

随机推荐

  • Linux配置sar服务

    Linux配置sar服务 下面以suse 为例 其他系统类似 检查是否有安装sar服务 sar服务包含在sysstat软件包中 rpm qa grep sysstat 如果没有安装执行下面的命令安装 zypper install sysst
  • OCR-APP开发总结(一)

    一 解决删除 运行模式 布局的问题 1 直接将对应的xml部分删除 2 将 Java中MainActivity中引用对应的ID部分删除 并将运行模式的值默认 3 效果图 二 裁剪方法 方法1 android自带的crop进行裁剪 缺点 裁剪
  • Spring MVC类型转换的相关说明

    转自 Spring MVC类型转换的相关说明 下文讲采用示例的方式 讲述相关类型说明 如下所示 insert jsp页面的代码如下
  • C++ 命名空间 & 模板

    命名空间 为了区分不同库中相同名称的函数 类 变量等 引入概念 命名空间 它可作为附加信息来帮助区分它们 使用了命名空间即定义了上下文 本质上就是定义了一个范围 定义命名空间 命名空间的定义使用关键字 namespace 后跟命名空间的名称
  • python实现从零搭建图书管理系统

    学习目标 tornado入门学习 异步服务器学习 异步数据库操作 一 虚拟环境 1 1 虚拟环境的创建 mkvirtualenv tornado py3 p python3 1 2 安装tornado pip install tornado
  • QT学习(一)—— 第一个QT项目

    1 1 新建一个项目 添加项目名称 选择Qwidget类 暂时不选择Generate form那个 补充 这里介绍了一下widget mainWindow dialog三种类的关系 编译器用QT自带的 创建完毕 1 2 项目介绍 1 2 1
  • python爬取内容剔除nbsp_Python正则获取、过滤或者替换HTML标签的方法

    本文实例介绍了Python通过正则表达式获取 去除 过滤 或者替换HTML标签的几种方法 具体内容如下 python正则表达式关键内容 python正则表达式转义符 匹配除换行符以外的任意字符 w 匹配字母或数字或下划线或汉字 s 匹配任意
  • CVPR2023:BiFormer阅读笔记

    目录 前言 1 模型的特点 2 双层路由注意力机制 BRA 3 BiFormer的结构 前言 BiFormer是CVPR2023的一篇文章 文中提出自注意力机制作为Transformer的核心模块 可以帮助网络捕捉长距离上下文依赖 但是这种
  • 基于MFC-MSComm控件的PC与单片机串口通信编程

    作者 uedsr542 来源 51hei 使用工具Visual C 6 0 9针RS232串口线一条 P0口带上拉的LED的51最小系统版 打开软件 新建 gt MFC AppWizard exe 再填写工程名和路径 gt 基于对话框 不需
  • Java项目数据迁移怎么做的

    Java项目数据迁移怎么做的 1 A表到B表找字段映射 即两个不同库表先做好数据字段的对应和补齐 2 代码程序 java 做功能 从一个数据库表中读出数据 然后写到另一个数据库表中 技术历练点 多线程 使用线程池 确定核心线程池的数量 使用
  • Android沉浸式和状态栏颜色的修改

    一 前言 关于沉浸式实现的方法网上已经有很多了 但是也有点良莠不齐 在遇到实际项目中的问题之后还是需要查阅大量的博客来对问题进行查证 本博客主要是在解决沉浸式的几个问题之后 对沉浸式的一点总结方便后面再次遇到同样问题时 能够很快的查阅而不至
  • 深度优先找出图中顶点U到顶点V的所有简单路径【C/C++】

    目录 前言 步骤如下 1 对于邻接矩阵 1 1 创建邻接矩阵 1 2 初始化path数组和找到顶点U和顶点V的下标 1 3 FindPath 函数 1 4 测试结果 1 5 改进代码 感谢 果冻的光滑 的指导和帮助 2 对于邻接表 2 1
  • java消息的确认模式

    1 AUTO ACKNOWLEDGE 从消息生产者角度 发送消息后就开始阻塞 直到从消息服务器收到回复 期间如发生异常则认为消息未被传送 从消息服务器角度 非持久消息在接受到消息后通知生产者 并将消息存入内存 持久性消息在接受道消息后先存入
  • mac系统安装搭载Windows系统虚拟机方法教程

    我们都知道macOS系统虽然相对windows系统而言更稳定 但macOS系统中可使用的软件数量较windows系统而言要少很多 对于macOS系统应用少的问题 我们可以使用虚拟机来解决 那么 苹果虚拟机好用吗 整体而言是可以的 苹果虚拟机
  • excel 导出:

    导出 1 模板导出 1 所需jar
  • 【Pytorch with fastai】第 4 章 :底层训练数字分类器

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • !important的使用

    前端css样式 使用的是vue 框架 本身的 css样式中与自定义的名称重复 导致样式被覆盖 原自定义样式 修改后自定义样式 其中 import 具有优先权 提升指定样式规则的应用优先权 参考文章 https www xuebuyuan c
  • 利用ESP8266_01做一个远程控制的智能插座

    手头有一块ESP8266 01WIFI模块 是前段时间在网上买arduino套件时送的 套件中还有一个单路继电器 反正这些东西折腾完了以后也没什么用 就将网上的资料汇总了一下 做了一个远程控制的智能排插 功能非常简单 就是通过手机远程控制排
  • 在Windows上搭建Go开发环境

    Go语言是由谷歌开发的一个高性能 跨平台的编程语言 安装Go 首先先来安装一下Go语言的SDK 目前Go语言的最新版本为Go 1 8 3 Go下载页面列出了各种操作系统的安装包 如果选择Windows MSI安装包的话 会将Go安装到C G
  • 开源库源码分析:Okhttp源码分析(一)

    开源库源码分析 OkHttp源码分析 导言 接下来就要开始分析一些常用开源库的源码了 作为最常用的网络请求库 OkHttp以其强大的功能深受Android开发者的喜爱 比如说我 还有对该库进行二次封装而成的热门库 比如说Retrofit 本