前言

Glide篇RxJava2篇之后的又一篇源码分析,和之前一样先列出要点,然后一步一步分析~

  • okhttp完整请求过程(Client、Request、Call、Interceptor、Response)
  • okhttp拦截器分析(RetryAndFollowUp、Bridge、Cache、Connect、CallServer)
  • okhttp缓存相关(CacheStrategy、RealConnectPool、Deque)

OkHttp完整请求过程

一次完整的okhttp请求过程大致可以概括为:

  • 构建OkHttpClient
  • 构建Request
  • 同步或异步发出请求,并经过Interceptors的处理
  • 得到Response

下面逐步分析,首先创建OkHttpClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Singleton
@Provides
OkHttpClient provideClient(OkHttpClient.Builder builder) {
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
builder.addInterceptor(loggingInterceptor);
}
File cacheFile = new File(Constants.PATH_CACHE);
Cache cache = new Cache(cacheFile, 1024 * 1024 * 50);
Interceptor cacheInterceptor = new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
if (!SystemUtil.isNetworkConnected()) {
request = request.newBuilder()
.cacheControl(CacheControl.FORCE_CACHE)
.build();
}
Response response = chain.proceed(request);
if (SystemUtil.isNetworkConnected()) {
int maxAge = 0;
// 有网络时, 不缓存, 最大保存时长为0
response.newBuilder()
.header("Cache-Control", "public, max-age=" + maxAge)
.removeHeader("Pragma")
.build();
} else {
// 无网络时,设置超时为4周
int maxStale = 60 * 60 * 24 * 28;
response.newBuilder()
.header("Cache-Control", "public, only-if-cached, max-stale=" + maxStale)
.removeHeader("Pragma")
.build();
}
return response;
}
};
//设置缓存
builder.addNetworkInterceptor(cacheInterceptor);
builder.addInterceptor(cacheInterceptor);
builder.cache(cache);
//设置超时
builder.connectTimeout(10, TimeUnit.SECONDS);
builder.readTimeout(20, TimeUnit.SECONDS);
builder.writeTimeout(20, TimeUnit.SECONDS);
//错误重连
builder.retryOnConnectionFailure(true);
return builder.build();
}

OkHttpClient的源码比较长,但是它本身并没有特定功能,只是作为一个载体,记录下我们初始化的各种设置信息,比如缓存策略,读写重连超时时间,自定义拦截器等等。由于它实现了Call.Factory接口,因此它还具备了将一个Request转化为Call的能力,依照注释Call是一个已经准备好的,可在未来某个时间点执行的请求任务,它的具体实现类是RealCall

1
2
3
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}

Request类也是一个纯粹的载体封装了一些请求信息,这里就不贴源码分析了,然后开始执行请求

1
2
3
4
5
6
7
8
9
10
11
12
13
//同步
call.execute();
//异步
call.enqueue(new Callback(){
@override
public void onFailure(Call call, IOExcepotion e) {
}
public void onResponse(Call call, Response response) {
}
});

异步场景要相对复杂一些,这里先以分析异步为例,进入enqueue方法

1
2
3
4
5
6
7
8
9
//RealCall.java
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

首先获取了client中的调度分发器dispatcher,它同时持有线程池和异步任务队列,会依照策略在适合的时机执行任务。AsyncCall是对RealCall的的内部类,持有RealCall的引用,相当于对RealCall做了一层包装,它继承自NamedRunnable,本质是一个Runnable,可被线程池执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//Dispatcher.java
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
//线程池,AsyncCall的实际执行者
private @Nullable ExecutorService executorService;
//待执行的异步任务队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步任务队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步任务队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
//线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
//新增一个异步任务
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
//当一个任务执行结束后调用
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
//将任务由Ready队列调度到Running队列
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
}

enqueue方法中先判断是否达到设置的并发上限(默认并发上限64,同一个主机地址并发上限5),未达上限则直接将任务交给线程池执行,并置入runningAsyncCalls,达到上限则先加入readyAsyncCalls等待处理。每当一个异步任务执行完成后都会调用promoteCalls方法尝试从ready队列中取出任务加入running队列中执行。我们之前说过了AsyncCall本质是一个Runnable,那么线程池在执行该任务时会触发它的run方法,进入AsyncCall的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//AsyncCall.java
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//触发dispatcher中的finish方法,进而触发promoteCalls进行任务调度
client.dispatcher().finished(this);
}
}
}

NamedRunnableRunnable做了一层封装,run方法会触发execute方法的执行,看来整个请求最核心的部分就是getResponseWithInterceptorChain方法了,它会直接返回结果response,然后触发回调给到上层。最后在finally代码块中会调用dispatcherfinish方法,前面的代码分析过每次异步任务执行结束后都会调用promoteCalls做任务调度就是由这里触发的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}

Interceptor拦截器是OkHttp中的核心组件,负责完成请求流程中的各项重要任务,并且采用了责任链模式的设计方法,每一步都可能得到Response,如果无法得到则将Request传递给下一个Interceptor,等待从下一个Interceptor中传回Response ,如此将Interceptor串联起来,这方面类似同样采取了责任链模式的Android事件传递机制。

Interceptor的排列顺序也非常重要,比如最终负责从网络请求数据的Interceptor必然放在最后,尝试从缓存中取数据或配置请求头部消息的Interceptor必然要放在它之前,在上面的代码中就依序将Interceptor放到了List中,并传递给了第一个Chain,注意传递的参数中还有一个0,它代表index,从List中获取与当前Chain对应的Interceptor时就依靠这个值,下面进入RealInterceptorChain的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public final class RealInterceptorChain implements Interceptor.Chain {
private final List<Interceptor> interceptors;
private final StreamAllocation streamAllocation;
private final HttpCodec httpCodec;
private final RealConnection connection;
private final int index;
private final Request request;
private int calls;
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
}
@Override public Connection connection() {
return connection;
}
public StreamAllocation streamAllocation() {
return streamAllocation;
}
public HttpCodec httpStream() {
return httpCodec;
}
@Override public Request request() {
return request;
}
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
//省略了大量代码
//实现串联
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
return response;
}
}

省略了部分代码,既然叫做Chain,那么它的作用只是帮助串联,逻辑是放到Interceptor处理的,串联的关键就在proceed方法中:

  • 首先创建用于串联在当前Chain之后的下一个Chain,并且让index+1,使得下一个Chain中也能获得与之对应的Interceptor
  • 然后根据当前index获取自己的Interceptor
  • 最后调用intercept方法,将下一个Chain传入当前Interceptor
  • intercept方法中,如果当前Interceptor能够获得目标Response则会直接将其返回,如果不能获取,则调用传入的下一个Chainproceed,将获取Response交给下一环,等待其结果并返回
  • 然后就是循环以上过程
  • request传递过程形如 当前Chain -> 当前Interceptor -> nextChain -> nextInterceptor

根据上面的分析,已经打通了整个请求过程。可见Request会由起始到结尾(结尾不一定是CallServerInterceptor,这里指实际执行到的最后一个Interceptor)传递给每一个Interceptor,最后Response又会反向依次传递回来,在这个过程中Interceptor可以分别对两者做各种处理。

下一节会分析各Interceptor在各自的intercept方法中是怎样做具体的逻辑处理的。

OkHttp拦截器分析

先列出之前在getResponseWithInterceptorChain方法中添加的各Interceptor,概括一下它们分别负责什么功能:

  • client.interceptors() 用户自定义的Interceptor,能拦截到所有请求
  • RetryAndFollowUpInterceptor 负责失败重连和重定向相关
  • BridgeInterceptor 负责配置请求的头信息,比如Keep-AlivegzipCookie等可以优化请求
  • CacheInterceptor 负责缓存管理,使用DiskLruCache做本地缓存,CacheStrategy决定缓存策略
  • ConnectInterceptor 开始与目标服务器建立连接,获得RealConnection
  • client.networkInterceptors() 用户自定义的Interceptor,仅在产生了网络请求时生效
  • CallServerInterceptor 向服务器发出一次网络请求的地方

RetryAndFollowUpInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//RetryAndFollowUpInterceptor.java
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) { //while循环,当没有return response时可以再次执行请求
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {//获取下一个Interceptor返回过来的response
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
//当需要重新请求时会得到一个非null的request
//followUpRequest函数中会完成重定向及重连的判断
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
  • streamAllocation对象是在这个Interceptor中创建的
  • followUpRequest方法中判断了是否需要重定向以及是否需要重连,需要重连时会返回一个request
  • request为null说明不需要重连,则直接返回response,否则由于while(true)循环会再次执行chain.proceed重新走一次请求流程

BridgeInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//BridgeInterceptor.java
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
  • 可以看到该Interceptor的内容几乎全是添加request头信息,没有什么需要分析的地方
  • 包括启用长连接Keep-Alive,设置Cookie,启用压缩与解压gzip,相当于做了一些请求优化

CacheInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//CacheInterceptor.java
@Override public Response intercept(Chain chain) throws IOException {
//先根据request取缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//根据缓存策略得到对应的request和response
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
//如果在缓存策略中设置了仅用缓存不使用网络加载,且没取到缓存,则请求失败
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
//如果缓存策略不要求仅从网络获取数据,则直接返回缓存内容
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//从网络加载
networkResponse = chain.proceed(networkRequest);
} finally {
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//如果已经有缓存了,则用最新的网络数据更新缓存
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//将本次请求的结果response根据cacheRequest写入缓存
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
  • 首先根据request从cache中取response
  • 将request和response传入CacheStrategy根据缓存策略(比如仅使用网络加载,仅使用缓存,缓存时效等)得到经由策略处理后的networkRequestcacheResponse
  • 若缓存策略要求仅从缓存中加载,且缓存未命中,则本次请求失败
  • 若缓存策略不要求仅从网络获取数据,则直接返回缓存内容
  • 以上条件均不满足,则把获得response的任务交给下一个Chain,开始执行网络请求
  • 得到网络请求结果后,如果已经有缓存了,则用最新的网络数据更新缓存
  • 最后将本次请求的结果response根据cacheRequest写入缓存

ConnectInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
//ConnectInterceptor.java
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//newStream中和server建立连接
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
  • Interceptor主要为下一步最终进行网络请求做铺垫,在这里获得了HttpCodecRealConnection,凑齐了网络请求需要的参数,一起传入下一个Chain
  • newStream方法中会去先尝试从RealConnectionPool中寻找已存在的连接,若未命中则创建一个连接并与服务器握手对接
  • 在完成连接后会将Socket对象通过Okio封装成BufferedSourceBufferedSink,并将两者传入HttpCodec,在下一步网络请求时会用到
1
2
3
//RealConnecting.java
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));

CallServerInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//CallServerInterceptor.java
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request); //发送request head
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody); //发送request body
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false); //获取response head
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response)) //获取response body
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
  • 在上面的源码中已经标注了4处最关键的点:发送request head、发送request body、获取response head、获取response body,其中body都是可选的,存在时才会发送/读取
  • 可以发现具体实现都交给了HttpCodec,它是对Http协议操作的一种抽象,针对HTTP/1.1与HTTP2有Http1CodecHttp2Codec两种实现
  • 方法的命名都是read和write,因为在HttpCodec中最后的请求和响应是由上一步封装的BufferedSourceBufferedSink来完成的,sink负责输出流,将写入的数据交由socket发出,source负责输入流,从socket中读取响应数据

OkHttp缓存相关

这节提一些与OkHttp缓存相关的内容,包括:

  • 缓存策略CacheStrategy
  • 连接池RealConnectPool
  • 任务缓冲队列Deque<AsyncCall>

缓存策略

之前在CacheInterceptor中提到过CacheStrategy类,但是没有细说,这里做一次详尽的分析,CacheStrategy依赖于本地缓存CacheHttp Header缓存配置。

Cache使用了DiskLruCache作为缓存容器,以request.url作为key来存储和读取response,这是一种很常见的缓存方式,由于篇幅就不做分析了

Http Header使用Http协议中约定的Cache-ControlExpiresETagLast-ModifiedDate等字段和服务端交互,由这些字段信息决定是否使用缓存,关于这些字段的含义可以查看Http协议中的定义,这里不做赘述

进入CacheStrategy的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public final class CacheStrategy {
public final @Nullable Request networkRequest;
public final @Nullable Response cacheResponse;
CacheStrategy(Request networkRequest, Response cacheResponse) {
this.networkRequest = networkRequest;
this.cacheResponse = cacheResponse;
}
//Factory获取Cache中取到的缓存,解析Http Header中的内容
public Factory(long nowMillis, Request request, Response cacheResponse) {
this.nowMillis = nowMillis;
this.request = request;
this.cacheResponse = cacheResponse;
if (cacheResponse != null) {
this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
Headers headers = cacheResponse.headers();
for (int i = 0, size = headers.size(); i < size; i++) {
String fieldName = headers.name(i);
String value = headers.value(i);
if ("Date".equalsIgnoreCase(fieldName)) {
servedDate = HttpDate.parse(value);
servedDateString = value;
} else if ("Expires".equalsIgnoreCase(fieldName)) {
expires = HttpDate.parse(value);
} else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
lastModified = HttpDate.parse(value);
lastModifiedString = value;
} else if ("ETag".equalsIgnoreCase(fieldName)) {
etag = value;
} else if ("Age".equalsIgnoreCase(fieldName)) {
ageSeconds = HttpHeaders.parseSeconds(value, -1);
}
}
}
}
public CacheStrategy get() {
CacheStrategy candidate = getCandidate();
if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
//由于header中包含onlyIfCached只使用缓存,而又没取到缓存
//则设定networkRequest与cacheResponse都为null
return new CacheStrategy(null, null);
}
return candidate;
}
private CacheStrategy getCandidate() {
if (cacheResponse == null) {
//缓存未命中,直接走网络请求流程
return new CacheStrategy(request, null);
}
//缓存结果缺少握手信息,直接走网络请求流程
if (request.isHttps() && cacheResponse.handshake() == null) {
return new CacheStrategy(request, null);
}
//根据当前缓存Response的code值,http header等信息判断本次缓存
//是否过期,是否可用,如果不可用则直接走网络请求流程
if (!isCacheable(cacheResponse, request)) {
return new CacheStrategy(request, null);
}
//本次请求header包含`no-cache`,`If-Modified-Since`,`If-None-Match`等字段
//则直接走网络请求
CacheControl requestCaching = request.cacheControl();
if (requestCaching.noCache() || hasConditions(request)) {
return new CacheStrategy(request, null);
}
long ageMillis = cacheResponseAge();
long freshMillis = computeFreshnessLifetime();
if (requestCaching.maxAgeSeconds() != -1) {
freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
}
long minFreshMillis = 0;
if (requestCaching.minFreshSeconds() != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
}
long maxStaleMillis = 0;
CacheControl responseCaching = cacheResponse.cacheControl();
if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
}
//判断缓存是否在有效期内,在的话返回缓存
if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
Response.Builder builder = cacheResponse.newBuilder();
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
}
long oneDayMillis = 24 * 60 * 60 * 1000L;
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
}
return new CacheStrategy(null, builder.build());
}
//判断是否需要在本次网络请求中附带一些缓存字段信息
String conditionName;
String conditionValue;
if (etag != null) {
conditionName = "If-None-Match";
conditionValue = etag;
} else if (lastModified != null) {
conditionName = "If-Modified-Since";
conditionValue = lastModifiedString;
} else if (servedDate != null) {
conditionName = "If-Modified-Since";
conditionValue = servedDateString;
} else {
//不需要附带则直接返回request
return new CacheStrategy(request, null);
}
Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);
//需要附带则返回加工后的request
Request conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build();
return new CacheStrategy(conditionalRequest, cacheResponse);
}
}
  • 首先在Factory方法中获取它所依赖的参数,就是我们之前提到过的Cache中取到的缓存和Http Header配置信息
  • 然后进入比较核心的方法getCandidate,在这里会根据之前拿到的依赖参数通过各种if判断返回不同的CacheStrategy对象
  • 本质上其实是返回不同的networkRequestcacheResponse,这样上层只需要关注这两个参数就知道下一步该如何做处理,复杂的判断都封装到了CacheStrategy对外透明,具体判断过程在代码中做了注释

连接池

okhttp利用连接池来复用连接,避免反复握手建立连接,并且具备在合适的时候回收连接的能力,这也是okhttp设计出彩的地方之一,进入ConnectionPool的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
public final class ConnectionPool {
//专门用于执行回收连接任务的线程池
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
private final int maxIdleConnections; //最大闲置连接数
private final long keepAliveDurationNs; //长连接维持时间
//回收连接任务,内部为死循环,waitNanos为-1时结束,否则间隔waitMillis执行一次回收
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//连接池的实际容器Deque
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
//默认最多5个闲置连接,长连接默认维持5分钟
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
//从池中取一个地址与路由符合当前请求的连接以复用,并使该连接引用计数加一
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) { //取符合条件的连接
streamAllocation.acquire(connection); //当前连接引用计数加一
return connection;
}
}
return null;
}
//每当添加新连接到连接池时都会触发一个回收任务
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
//实际执行回收逻辑的方法
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
synchronized (this) {
//遍历连接池
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//如果当前连接引用计数大于0说明正在使用,跳过
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
//找出目前闲置时间最久的闲置连接,循环结束后用来尝试回收
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//如果闲置时间超过了长连接维持时间
//或者当前闲置连接数超过了最大闲置数
//则回收当前连接,将其移出连接池
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//否则,如果存在闲置连接的话,说明闲置时间还没有达到闲置上限
//计算还要等多久当前连接才会达到长连接维持时间上限,并返回这个时间
//cleanupRunnable的死循环会让当前线程wait这个时间后再次进入该方法
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//否则,证明连接全部处于正在使用状态
//则将闲置上限时间返回,等待这段时间后再来清理
return keepAliveDurationNs;
} else {
//否则,说明当前没有连接,跳出cleanupRunnable的死循环,结束这个负责清理工作的Runnable
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
return 0;
}
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
//当前connection的引用计数
List<Reference<StreamAllocation>> references = connection.allocations;
//遍历当前connection持有的引用计数列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
//如果当前该引用仍存在(connect仍在被此引用使用)则跳过
if (reference.get() != null) {
i++;
continue;
}
//否则从引用计数列表里移除该引用
references.remove(i);
connection.noNewStreams = true;
//如果移除当前引用后引用列表空了
//说明当前connection已经无人使用了,处于闲置状态
//则将其闲置时间置为闲置上限
//并返回当前持有的引用计数为0,等待被回收
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
}

上面的代码虽然比较长,但是做了详尽的注释,概括的来说

  • 连接池规定了maxIdleConnections最大闲置数量和keepAliveDurationNs长连接维持时间(最长闲置时间)这两个参数,作为判断是否有连接可以回收的指标
  • 连接池的本质是一个Deque容器,它开辟了一个线程池executor专门用于执行连接回收任务cleanupRunnable
  • 每当有一个新连接加入连接池时都会创建一个cleanupRunnable去执行回收任务,每当一个满足路由与地址的连接从池中取出用以复用时,都会使该connection的引用计数加一
  • 关于connection的引用计数是一个List<Reference<StreamAllocation>>对象,每当用一个请求任务使用了该connection都会使List的成员加一,当任务结束时使List的成员减一,所以引用计数为0时说明当前connection处于闲置状态
  • cleanupRunnable中有一个死循环,只有当前没有闲置连接需要被回收时才会跳出循环,否则会等待cleanup方法返回的时间后再次执行cleanup做回收
  • cleanup方法是实际执行回收逻辑的地方,它会先遍历连接池,找出闲置时间最久的连接,然后根据最早设定的maxIdleConnectionskeepAliveDurationNs两个参数判断是否可以回收
    • 能回收则直接回收并返回0,这样死循环会再次执行cleanup做回收
    • 不能回收会判断是否全部连接处于活跃,不是的话则计算当前待回收连接距离闲置上限时间的时间间隔,返回这个间隔,cleanupRunnable线程会wait这个时间后再执行cleanup
    • 全活跃的话则返回闲置上限时间,线程等待这个时间后再执行cleanup
    • 最后判断如果当前已经没有连接了,则返回-1,会跳出死循环,结束当前这个cleanupRunnable

以上就是连接池的运作原理

任务缓冲队列

这部分内容在上面第一节分析完整请求过程的Dispatcher.java类的解析中已经分析过了,包括使用Deque作为任务队列容器,如何调度readyrunning队列,线程池执行任务等等,这里不就再赘述了

最后

OkHttp篇完成٩(ˊᗜˋ*)و 依旧篇幅巨长,把想到的点都提了一下,尤其值得学习的是InterceptorConnectionPool这两个模块的设计,前者易扩展,后者高性能。敬请期待下一篇Retrofit篇~

参考文章

拆轮子系列:拆 OkHttp
浅析OkHttp3
OkHttp3源码分析综述

声明:本站所有文章均为原创或翻译,遵循署名-非商业性使用-禁止演绎 4.0 国际许可协议,如需转载请确保您对该协议有足够了解,并附上作者名(Est)及原贴地址