本文首发于微信公众号「后厂技术官」

前言

Android网络编程系列是我从2016年开始写的,这个系列的总访问量在几十万,其中Android网络编程(六)OkHttp3用法全解析这篇文章在CSDN和本博客的阅读总量就有了14万的阅读量。随着时间的推移,有些内容不可避免的过时了,就OkHttp来说,最近几年经历了OkHttp2到OkHttp4。关于OkHttp4的用法和OkHttp3大同小异,本文来解析OkHttp4的源码,主要介绍OkHttp的网络请求流程和拦截器链。整体来说,OkHttp4在主要逻辑上和OkHttp3差别不大,主要的区别就是源码由Java实现变为了Kotlin实现。

1.OkHttpClient的创建

一般来说,我们项目的OkHttpClient是单例,创建OkHttpClient有两种方式,一种是new,一种是使用建造者模式为其设置一些参数,无论是哪一种都是使用建造者模式来完成OkHttpClient的初始化,OkHttpClient的构造器如下所示:
okhttp/src/main/java/okhttp3/OkHttpClient.kt

constructor() : this(Builder())

Builder的构造器如下所示。
okhttp/src/main/java/okhttp3/OkHttpClient.kt

class Builder constructor() {
//任务调度器,控制并发的请求
internal var dispatcher: Dispatcher = Dispatcher()
//连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//应用拦截器集合
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络拦截器集合
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//事件监听工厂
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//连接失败时,是否重连
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
//是否跟随重定向
internal var followRedirects = true
//是否跟随ssl重定向
internal var followSslRedirects = true
//CookieJar机制,定制Cookie
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//磁盘缓存
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
//代理选择器
internal var proxySelector: ProxySelector? = null
...
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
//ping的间隔时长
internal var pingInterval = 0
internal var routeDatabase: RouteDatabase? = null

2.创建RealCall

当要请求网络的时候需要用OkHttpClient.newCall(request)进行execute或者enqueue操作,OkHttpClient的newCall方法如下所示。

okhttp/src/main/java/okhttp3/OkHttpClient.kt

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

OkHttpClient的newCall方法会得到一个RealCall,它是Call接口的实现类。RealCall有三个参数,第一个是OkHttpClient,第二个是发送的请求,第三个是否使用WebSocket,默认值为false,可以看出RealCall是对请求的一个封装。

3.Dispatcher任务调度

网络请求主要分为同步请求和异步请求,在讲到请求之前,需要先了解一个类,那就是Dispatcher,它主要用于控制并发的请求,无论是同步请求还是异步请求,都会通过Dispatcher来处理。Dispatcher主要维护了以下变量。
okhttp/src/main/java/okhttp3/Dispatcher.kt

class Dispatcher constructor() {
//最大任务请求数
@get:Synchronized var maxRequests = 64
...
//每个主句的最大任务请求数
@get:Synchronized var maxRequestsPerHost = 5
...
//执行异步请求的线程池
private var executorServiceOrNull: ExecutorService? = null
...
//准备运行的异步请求队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//正在运行的异步请求队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//正在运行的同步请求队列
private val runningSyncCalls = ArrayDeque<RealCall>()
...

接下来查看Dispatcher的构造方法,如下所示。
okhttp/src/main/java/okhttp3/Dispatcher.kt

constructor(executorService: ExecutorService) : this() {
this.executorServiceOrNull = executorService
}

@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}

Dispatcher的构造方法可以设定线程池,如果没有设定线程池,会通过executorService方法来创建默认的线程池。

4.异步请求

了解了异步请求后,同步请求也很好理解,因此这里只介绍异步请求。异步请求会调用RealCall的enqueue方法,代码如下所示。
okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt

override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }//1
executed = true
}
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))//2
}

注释1处用于检查是否重复调用enqueue方法。注释2处调用了dispatcher的enqueue方法,并将AsyncCall传进去。AsyncCall是RealCall的内部类,实现了Runnable接口,后面会再次提到它。
dispatcher的enqueue方法如下所示。
okhttp/src/main/java/okhttp3/Dispatcher.kt

internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)//1
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}

注释1处会将AsyncCall添加到准备运行的异步请求队列readyAsyncCalls中。接着来查看注释2处的promoteAndExecute方法:
okhttp/src/main/java/okhttp3/Dispatcher.kt

private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()

val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()

if (runningAsyncCalls.size >= this.maxRequests) break //1
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue //2
i.remove()
asyncCall.callsPerHost.incrementAndGet()
//添加到可执行任务集合
executableCalls.add(asyncCall)//3
//正在运行的异步请求队列
runningAsyncCalls.add(asyncCall)//4
}
isRunning = runningCallsCount() > 0
}

for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}

return isRunning
}

首先会遍历readyAsyncCalls,取出每个asyncCall。
在注释1处判断如果正在运行的异步请求队列runningAsyncCalls大于最大请求数则break。
注释2处用于判断asyncCall中的主机数是否超过同一主机最大任务数。
如果上面的条件都通过则将调用asyncCall的executeOn方法,代码如下所示。
okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt

internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//开启异步任务
executorService.execute(this)//1
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//网络回调失败
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//如果任务失败,从runningAsyncCalls将当前AsyncCall移除
client.dispatcher.finished(this)
}
}
}

override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//通过拦截器链来得到网络响应
val response = getResponseWithInterceptorChain()//2
signalledCallback = true
//网络响应成功回调
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
//网络响应失败回调
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
//从runningAsyncCalls将当前AsyncCall移除
client.dispatcher.finished(this)//3
}
}
}
}

注释1处,将AsyncCall添加到线程池executorService中,执行异步任务。注释2处,通过拦截器链来得到网络响应。无论响应成功还是失败都会在注释3处的dispatcher的finished方法,它的代码如下所示。
okhttp/src/main/java/okhttp3/Dispatcher.kt

internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")//1
idleCallback = this.idleCallback
}

val isRunning = promoteAndExecute()//2

if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}

注释1处,将AsyncCall从runningAsyncCalls队列中移除,注释2处接着调用promoteAndExecute方法,
接着处理请求。

5.拦截器链

拦截器链是Okhttp的核心逻辑,也是面试经常问到的知识点。
在AsyncCall的run方法中调用了getResponseWithInterceptorChain方法,如下所示。
okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
//创建拦截器集合
val interceptors = mutableListOf<Interceptor>()//1
//添加用户设置的应用拦截器
interceptors += client.interceptors
//负责重试和重定向
interceptors += RetryAndFollowUpInterceptor(client)
//用于桥接应用层和网络层的请求数据
interceptors += BridgeInterceptor(client.cookieJar)
//用于处理缓存
interceptors += CacheInterceptor(client.cache)
//网络连接拦截器,用于获取一个连接
interceptors += ConnectInterceptor
if (!forWebSocket) {
//添加用户设置的网络拦截器
interceptors += client.networkInterceptors
}
//用于请求网络并获取网络响应
interceptors += CallServerInterceptor(forWebSocket)
//创建职责链
val chain = RealInterceptorChain(//1
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)

var calledNoMoreExchanges = false
try {
//启动职责链
val response = chain.proceed(originalRequest)//2
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

getResponseWithInterceptorChain方法有点长,主要做了两件事:

1.创建拦截器集合,并将所有拦截器添加进去
2.创建职责链,并启动。

现在分别介绍下各个拦截器的作用:

  • interceptor:应用拦截器,通过client设置。
  • RetryAndFollowUpInterceptor:重试拦截器,负责网络请求中的重试和重定向,比如网络请求过程中出现异常,就会重试请求。
  • BridgeInterceptor:桥接拦截器,用于桥接应用层和网络层的数据,请求时将应用层的数据类型转换为网络层的数据类型,响应时则将网络层返回的数据类型转换为应用层的数据类型。
  • CacheInterceptor:缓存拦截器,负责读取和更新缓存,可以配置自定义的缓存拦截器。
  • ConnectInterceptor:网络连接拦截器,其内部会获取一个连接。
  • networkInterceptor:网络拦截器,通过client设置。
  • CallServerInterceptor:请求服务拦截器,拦截器链的最后的拦截器,用于向服务端发送数据并获取响应。

注释1处创建了职责链,听名称就知道采用的是职责链模式,使得每一个拦截器都有机会处理请求,这些拦截器形成了拦截器链,网络请求经过拦截器链的处理,然后发送出去,同样的,网络响应也经过拦截器的处理返回给应用层。

注释2处启动了职责链,如下所示。
okhttp/src/main/java/okhttp3/internal/http/RealInterceptorChain.kt

@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.connection.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
val next = copy(index = index + 1, request = request)//1
val interceptor = interceptors[index]//2

@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(//3
"interceptor $interceptor returned null")

if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}

注释1处调用了RealInterceptorChain的copy方法,其内部会新建一个RealInterceptorChain,通过参数index + 1来循环interceptors中的拦截器。
注释2处用于获取当前要执行的拦截器。
注释3处运行当前的拦截器,并设置了下个拦截器。其内部的逻辑通常是:当前拦截器处理完成后会接着执行下个拦截器的proceed方法。