privatevoidpromoteCalls(){ if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
Response getResponse(Request request, boolean forWebSocket)throws IOException { ...省略 // Create the initial HTTP engine. Retries and redirects need new engine for each attempt. engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0; while (true) { if (canceled) { engine.releaseStreamAllocation(); thrownew IOException("Canceled"); }
boolean releaseConnection = true; try { engine.sendRequest(); engine.readResponse(); releaseConnection = false; } catch (RequestException e) { // The attempt to interpret the request failed. Give up. throw e.getCause(); } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. ...省略 } }
if (writeRequestHeadersEagerly()) { long contentLength = OkHeaders.contentLength(request); if (bufferRequestBody) { if (contentLength > Integer.MAX_VALUE) { thrownew IllegalStateException("Use setFixedLengthStreamingMode() or " + "setChunkedStreamingMode() for requests larger than 2 GiB."); }
if (contentLength != -1) { // Buffer a request body of a known length. httpStream.writeRequestHeaders(networkRequest); requestBodyOut = new RetryableSink((int) contentLength); } else { // Buffer a request body of an unknown length. Don't write request headers until the // entire body is ready; otherwise we can't set the Content-Length header correctly. requestBodyOut = new RetryableSink(); } } else { httpStream.writeRequestHeaders(networkRequest); requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength); } } success = true; } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (!success && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } }
// If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { //检查缓存是否可用,如果可用。那么就用当前缓存的Response,关闭网络连接,释放连接。 if (validate(cacheResponse, networkResponse)) { userResponse = cacheResponse.newBuilder() .request(userRequest) .priorResponse(stripBody(priorResponse)) .headers(combine(cacheResponse.headers(), networkResponse.headers())) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); releaseStreamAllocation();
// Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). InternalCache responseCache = Internal.instance.internalCache(client); responseCache.trackConditionalCacheHit(); // 更新缓存 responseCache.update(cacheResponse, stripBody(userResponse)); userResponse = unzip(userResponse); return; } else { closeQuietly(cacheResponse.body()); } }
privatestaticbooleanvalidate(Response cached, Response network){ //如果服务器返回304则缓存有效 if (network.code() == HTTP_NOT_MODIFIED) { returntrue; } //通过缓存和网络请求响应中的Last-Modified来计算是否是最新数据,如果是则缓存有效 Date lastModified = cached.headers().getDate("Last-Modified"); if (lastModified != null) { Date networkLastModified = network.headers().getDate("Last-Modified"); if (networkLastModified != null && networkLastModified.getTime() < lastModified.getTime()) { returntrue; } } returnfalse; }
如缓存果过期或者强制放弃缓存,在此情况下,缓存策略全部交给服务器判断,客户端只用发送条件get请求即可,如果缓存是有效的,则返回304 Not Modifiled,否则直接返回body。条件get请求有两种方式一种是Last-Modified-Date,一种是 ETag。这里采用了Last-Modified-Date,通过缓存和网络请求响应中的Last-Modified来计算是否是最新数据,如果是则缓存有效。
5.失败重连
最后我们再回到RealCall的getResponse方法:
Response getResponse(Request request, boolean forWebSocket)throws IOException { ...省略 boolean releaseConnection = true; try { engine.sendRequest(); engine.readResponse(); releaseConnection = false; } catch (RequestException e) { // The attempt to interpret the request failed. Give up. throw e.getCause(); } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null); if (retryEngine != null) { releaseConnection = false; engine = retryEngine; continue; } // Give up; recovery is not possible. throw e.getLastConnectException(); } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. HttpEngine retryEngine = engine.recover(e, null); if (retryEngine != null) { releaseConnection = false; engine = retryEngine; continue; }
// Give up; recovery is not possible. throw e; } finally { // We're throwing an unchecked exception. Release any resources. if (releaseConnection) { StreamAllocation streamAllocation = engine.close(); streamAllocation.release(); } } ...省略 engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null, response); } }
public HttpEngine recover(IOException e, Sink requestBodyOut){ if (!streamAllocation.recover(e, requestBodyOut)) { returnnull; }
if (!client.retryOnConnectionFailure()) { returnnull; }
StreamAllocation streamAllocation = close();
// For failure recovery, use the same route selector with a new connection. returnnew HttpEngine(client, userRequest, bufferRequestBody, callerWritesRequestBody, forWebSocket, streamAllocation, (RetryableSink) requestBodyOut, priorResponse); }