Android网络编程(十二)源码解析Okhttp4.x

本文首发于微信公众号「后厂技术官」

前言

Android网络编程系列是我从2016年开始写的,这个系列的总访问量在几十万,其中Android网络编程(六)OkHttp3用法全解析这篇文章在CSDN和本博客的阅读总量就有了14万的阅读量。随着时间的推移,有些内容不可避免的过时了,就OkHttp来说,最近几年经历了OkHttp2到OkHttp4。关于OkHttp4的用法和OkHttp3大同小异,本文来解析OkHttp4的源码,主要介绍OkHttp的网络请求流程和拦截器链。整体来说,OkHttp4在主要逻辑上和OkHttp3差别不大,主要的区别就是源码由Java实现变为了Kotlin实现。

1.OkHttpClient的创建

一般来说,我们项目的OkHttpClient是单例,创建OkHttpClient有两种方式,一种是new,一种是使用建造者模式为其设置一些参数,无论是哪一种都是使用建造者模式来完成OkHttpClient的初始化,OkHttpClient的构造器如下所示:
okhttp/src/main/java/okhttp3/OkHttpClient.kt

constructor() : this(Builder())

Builder的构造器如下所示。
okhttp/src/main/java/okhttp3/OkHttpClient.kt

class Builder constructor() {
  //任务调度器,控制并发的请求
  internal var dispatcher: Dispatcher = Dispatcher()
  //连接池
  internal var connectionPool: ConnectionPool = ConnectionPool()
  //应用拦截器集合
  internal val interceptors: MutableList<Interceptor> = mutableListOf()
  //网络拦截器集合
  internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
  //事件监听工厂
  internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
  //连接失败时,是否重连
  internal var retryOnConnectionFailure = true
  internal var authenticator: Authenticator = Authenticator.NONE
  //是否跟随重定向
  internal var followRedirects = true
  //是否跟随ssl重定向
  internal var followSslRedirects = true
  //CookieJar机制,定制Cookie
  internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
  //磁盘缓存
  internal var cache: Cache? = null
  internal var dns: Dns = Dns.SYSTEM
  internal var proxy: Proxy? = null
  //代理选择器
  internal var proxySelector: ProxySelector? = null
  ...
  internal var callTimeout = 0
  internal var connectTimeout = 10_000
  internal var readTimeout = 10_000
  internal var writeTimeout = 10_000
  //ping的间隔时长
  internal var pingInterval = 0
  internal var routeDatabase: RouteDatabase? = null

2.创建RealCall

当要请求网络的时候需要用OkHttpClient.newCall(request)进行execute或者enqueue操作,OkHttpClient的newCall方法如下所示。

okhttp/src/main/java/okhttp3/OkHttpClient.kt

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

OkHttpClient的newCall方法会得到一个RealCall,它是Call接口的实现类。RealCall有三个参数,第一个是OkHttpClient,第二个是发送的请求,第三个是否使用WebSocket,默认值为false,可以看出RealCall是对请求的一个封装。

3.Dispatcher任务调度

网络请求主要分为同步请求和异步请求,在讲到请求之前,需要先了解一个类,那就是Dispatcher,它主要用于控制并发的请求,无论是同步请求还是异步请求,都会通过Dispatcher来处理。Dispatcher主要维护了以下变量。
okhttp/src/main/java/okhttp3/Dispatcher.kt

class Dispatcher constructor() {
 //最大任务请求数
  @get:Synchronized var maxRequests = 64
  ...
  //每个主句的最大任务请求数
  @get:Synchronized var maxRequestsPerHost = 5
  ...
  //执行异步请求的线程池
  private var executorServiceOrNull: ExecutorService? = null
  ...
  //准备运行的异步请求队列
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  //正在运行的异步请求队列
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()
  //正在运行的同步请求队列
  private val runningSyncCalls = ArrayDeque<RealCall>()
  ...

接下来查看Dispatcher的构造方法,如下所示。
okhttp/src/main/java/okhttp3/Dispatcher.kt

constructor(executorService: ExecutorService) : this() {
  this.executorServiceOrNull = executorService
}

  @get:JvmName("executorService") val executorService: ExecutorService
  get() {
    if (executorServiceOrNull == null) {
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
  }

Dispatcher的构造方法可以设定线程池,如果没有设定线程池,会通过executorService方法来创建默认的线程池。

4.异步请求

了解了异步请求后,同步请求也很好理解,因此这里只介绍异步请求。异步请求会调用RealCall的enqueue方法,代码如下所示。
okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt

override fun enqueue(responseCallback: Callback) {
  synchronized(this) {
    check(!executed) { "Already Executed" }//1
    executed = true
  }
  callStart()
  client.dispatcher.enqueue(AsyncCall(responseCallback))//2
}

注释1处用于检查是否重复调用enqueue方法。注释2处调用了dispatcher的enqueue方法,并将AsyncCall传进去。AsyncCall是RealCall的内部类,实现了Runnable接口,后面会再次提到它。
dispatcher的enqueue方法如下所示。
okhttp/src/main/java/okhttp3/Dispatcher.kt

internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
    readyAsyncCalls.add(call)//1
    if (!call.call.forWebSocket) {
      val existingCall = findExistingCallWithHost(call.host)
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
  }
  promoteAndExecute()
}

注释1处会将AsyncCall添加到准备运行的异步请求队列readyAsyncCalls中。接着来查看注释2处的promoteAndExecute方法:
okhttp/src/main/java/okhttp3/Dispatcher.kt

private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()

  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {
      val asyncCall = i.next()

      if (runningAsyncCalls.size >= this.maxRequests) break //1
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue //2
      i.remove()
      asyncCall.callsPerHost.incrementAndGet()
      //添加到可执行任务集合
      executableCalls.add(asyncCall)//3
      //正在运行的异步请求队列
      runningAsyncCalls.add(asyncCall)//4
    }
    isRunning = runningCallsCount() > 0
  }

  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService)
  }

  return isRunning
}

首先会遍历readyAsyncCalls,取出每个asyncCall。
在注释1处判断如果正在运行的异步请求队列runningAsyncCalls大于最大请求数则break。
注释2处用于判断asyncCall中的主机数是否超过同一主机最大任务数。
如果上面的条件都通过则将调用asyncCall的executeOn方法,代码如下所示。
okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt

internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
   ...
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()
      var success = false
      try {
        //开启异步任务
        executorService.execute(this)//1
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        //网络回调失败
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
        //如果任务失败,从runningAsyncCalls将当前AsyncCall移除
          client.dispatcher.finished(this)
        }
      }
    }

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          //通过拦截器链来得到网络响应
          val response = getResponseWithInterceptorChain()//2
          signalledCallback = true
          //网络响应成功回调
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            //网络响应失败回调
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          //从runningAsyncCalls将当前AsyncCall移除
          client.dispatcher.finished(this)//3
        }
      }
    }
  }

注释1处,将AsyncCall添加到线程池executorService中,执行异步任务。注释2处,通过拦截器链来得到网络响应。无论响应成功还是失败都会在注释3处的dispatcher的finished方法,它的代码如下所示。
okhttp/src/main/java/okhttp3/Dispatcher.kt

internal fun finished(call: AsyncCall) {
  call.callsPerHost.decrementAndGet()
  finished(runningAsyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
  val idleCallback: Runnable?
  synchronized(this) {
    if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")//1
    idleCallback = this.idleCallback
  }

  val isRunning = promoteAndExecute()//2

  if (!isRunning && idleCallback != null) {
    idleCallback.run()
  }
}

注释1处,将AsyncCall从runningAsyncCalls队列中移除,注释2处接着调用promoteAndExecute方法,
接着处理请求。

5.拦截器链

拦截器链是Okhttp的核心逻辑,也是面试经常问到的知识点。
在AsyncCall的run方法中调用了getResponseWithInterceptorChain方法,如下所示。
okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
  //创建拦截器集合
  val interceptors = mutableListOf<Interceptor>()//1
  //添加用户设置的应用拦截器
  interceptors += client.interceptors
  //负责重试和重定向
  interceptors += RetryAndFollowUpInterceptor(client)
  //用于桥接应用层和网络层的请求数据
  interceptors += BridgeInterceptor(client.cookieJar)
  //用于处理缓存
  interceptors += CacheInterceptor(client.cache)
  //网络连接拦截器,用于获取一个连接
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
  //添加用户设置的网络拦截器
    interceptors += client.networkInterceptors
  }
  //用于请求网络并获取网络响应
  interceptors += CallServerInterceptor(forWebSocket)
  //创建职责链
  val chain = RealInterceptorChain(//1
      call = this,
      interceptors = interceptors,
      index = 0,
      exchange = null,
      request = originalRequest,
      connectTimeoutMillis = client.connectTimeoutMillis,
      readTimeoutMillis = client.readTimeoutMillis,
      writeTimeoutMillis = client.writeTimeoutMillis
  )

  var calledNoMoreExchanges = false
  try {
  //启动职责链
    val response = chain.proceed(originalRequest)//2
    if (isCanceled()) {
      response.closeQuietly()
      throw IOException("Canceled")
    }
    return response
  } catch (e: IOException) {
    calledNoMoreExchanges = true
    throw noMoreExchanges(e) as Throwable
  } finally {
    if (!calledNoMoreExchanges) {
      noMoreExchanges(null)
    }
  }
}

getResponseWithInterceptorChain方法有点长,主要做了两件事:

1.创建拦截器集合,并将所有拦截器添加进去
2.创建职责链,并启动。

现在分别介绍下各个拦截器的作用:

  • interceptor:应用拦截器,通过client设置。
  • RetryAndFollowUpInterceptor:重试拦截器,负责网络请求中的重试和重定向,比如网络请求过程中出现异常,就会重试请求。
  • BridgeInterceptor:桥接拦截器,用于桥接应用层和网络层的数据,请求时将应用层的数据类型转换为网络层的数据类型,响应时则将网络层返回的数据类型转换为应用层的数据类型。
  • CacheInterceptor:缓存拦截器,负责读取和更新缓存,可以配置自定义的缓存拦截器。
  • ConnectInterceptor:网络连接拦截器,其内部会获取一个连接。
  • networkInterceptor:网络拦截器,通过client设置。
  • CallServerInterceptor:请求服务拦截器,拦截器链的最后的拦截器,用于向服务端发送数据并获取响应。

注释1处创建了职责链,听名称就知道采用的是职责链模式,使得每一个拦截器都有机会处理请求,这些拦截器形成了拦截器链,网络请求经过拦截器链的处理,然后发送出去,同样的,网络响应也经过拦截器的处理返回给应用层。

注释2处启动了职责链,如下所示。
okhttp/src/main/java/okhttp3/internal/http/RealInterceptorChain.kt

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)
    calls++
    if (exchange != null) {
      check(exchange.connection.supportsUrl(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }
    val next = copy(index = index + 1, request = request)//1
    val interceptor = interceptors[index]//2

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(//3
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }
    check(response.body != null) { "interceptor $interceptor returned a response with no body" }
    return response
  }

注释1处调用了RealInterceptorChain的copy方法,其内部会新建一个RealInterceptorChain,通过参数index + 1来循环interceptors中的拦截器。
注释2处用于获取当前要执行的拦截器。
注释3处运行当前的拦截器,并设置了下个拦截器。其内部的逻辑通常是:当前拦截器处理完成后会接着执行下个拦截器的proceed方法。

分享到 评论

分享前沿技术、技术资讯、行业秘闻,技术管理,助力10万+程序员成长进阶。

---