热门搜索 :
考研考公
您的当前位置:首页正文

【Android源码伴读】OkHttp源码探索

来源:东饰资讯网

OkHttp的使用

OkHttp源码探索

废话少说, 我们现在就开始源码探索. 在探索之前, 先对OkHttp工作流, 也就是它的模型有一个整体的认识, 见下图:

okhttp-workflow

OkHttp请求分发类Dispatcher

以下我们将介绍OkHttp中的类源码, 重点来介绍OkHttp为了优化网络请求做了哪些工作.
在发起一个请求的时候, 是调用OkHttpClient.newCall().enqueue(Callback)来入队一个异步请求, 其中调用Dispatcher.enqueue(AsyncCall)执行真正的入队操作. 我们来看看Dispatcher的注释:


/**
 * Policy on when async requests are executed.
 *
 * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
 * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
 * of calls concurrently.
 */

这个分发器负责调度和并发执行请求, 看描述我们大致能猜到它内部使用了ExecutorService去执行请求队列. 打开文件发现它是直接new ThreadPoolExecutor来初始化的, 不限制线程池容量, 使用SynchronousQueue作为底层结构, 自行维护一个maxRequests来限制正在执行的请求的最大数量. 调用enqueue()将准备好的请求加入到正在执行的请求队列中并调用executorService().execute(call)执行(我们知道, 这里面就是把要执行的Runnable加入到队列中,等待被调度到的时候调用Runnable.run()接口).

AsyncCallRealCall的内部类, 在enqueue的时候实际上就是将一个AsyncCall实例入队到ExecutorService的队列中, 反而没RealCall什么事. AsyncCall继承自NamedRunnable, 而NamedRunnable显然继承自Runnable, 并且在它里面把run()方法实现为调用execute()方法. 这就使得ExecutorService通过调用run()方法最终会调用到AsyncCall.execute()方法. 那外层的RealCall又有什么用呢? 其实它提供了一个同步的execute()方法可以让RealCall实例立即被加入到执行队列中, 并等待返回.
下面我们来看RealCall的代码.


 // in file okhttp3/RealCall.java
 
  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }
  
  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
  
    final class AsyncCall extends NamedRunnable {
      private final Callback responseCallback;

      AsyncCall(Callback responseCallback) {
        this.responseCallback = responseCallback;
      }
      
      ......

      @Override protected void execute() {
        boolean signalledCallback = false;
        try {
          Response response = getResponseWithInterceptorChain();
          if (retryAndFollowUpInterceptor.isCanceled()) {
            signalledCallback = true;
            responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
          } else {
            signalledCallback = true;
            responseCallback.onResponse(RealCall.this, response);
          }
        } catch (IOException e) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
          } else {
            responseCallback.onFailure(RealCall.this, e);
          }
        } finally {
          client.dispatcher().finished(this);
        }
      }
    }

OkHttp请求执行和响应

现在我们了解了请求的并发执行, 我们再来看连接. 上述过程中好像没有看到哪里有管理连接的内容? 别急, 我们一步一步来, 先找到第一个返回Response的地方. 经过上述观察我们发现在getResponseWithInterceptorChain方法第一次得到Response. 在这个方法里面, 用了一系列拦截器(Interceptor)来初始化一个RealInterceptorChain实例, 并通过该实例的proceed()方法获得了Response. 打开RealInterceptorChain.proceed可以发现它是对RealInterceptorChain链上的Interceptor逐个地使用同一套连接参数去初始化和调用intercept()方法, 那么具体拦截下来做什么, 就得看具体intercept中的行为了. 这里可以看到拦截器链首先是添加了Client上的拦截器, 然后加上retryAndFollowUpInterceptor和另外三个拦截器. 我们看看默认的拦截器都有哪些:


  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;
    this.proxy = builder.proxy;
    this.protocols = builder.protocols;
    this.connectionSpecs = builder.connectionSpecs;
    this.interceptors = Util.immutableList(builder.interceptors);// 初始化拦截器
     = 
    this.proxySelector = builder.proxySelector;
    this.cookieJar = builder.cookieJar;
    this.cache = builder.cache;
    this.internalCache = builder.internalCache;
    this.socketFactory = builder.socketFactory;

    boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
      this.certificateChainCleaner = builder.certificateChainCleaner;
    } else {
      X509TrustManager trustManager = systemDefaultTrustManager();
      this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }

    this.hostnameVerifier = builder.hostnameVerifier;
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
        certificateChainCleaner);
    this.proxyAuthenticator = builder.proxyAuthenticator;
    this.authenticator = builder.authenticator;
    this.connectionPool = builder.connectionPool;
    this.dns = builder.dns;
    this.followSslRedirects = builder.followSslRedirects;
    this.followRedirects = builder.followRedirects;
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
    this.connectTimeout = builder.connectTimeout;
    this.readTimeout = builder.readTimeout;
    this.writeTimeout = builder.writeTimeout;
    this.pingInterval = builder.pingInterval;
  }

直接用的Builder的拦截器, 看一下发现是个空的ArrayList, 好吧, 啥都没有. 那第一个被调用intercept方法的就是retryAndFollowUpInterceptor了. 打开看看发现他是首先分配了一个StreamAllocation, 在流不断开的情况下不断调用传入的拦截器获得响应结果, 并与前面得到的响应结果组装起来, 在无后续响应结果的情况下返回响应.
看来我们还得追踪下一个拦截器, 我估计追踪到最后一个就是实际的连接创建了. 果不其然, BridgeInterceptor作为桥, 将用户请求Request重新构造了一个Request并交给传入的拦截器处理, 对返回的Response也做了处理, 如存Cookie, 构造新的Response等. 它的下一个拦截器是CacheInterceptor, 这个拦截器就是处理缓存的了, 根据我们的缓存策略在缓存表中试图获取缓存的响应并判断是否只需要返回缓存就行了. 如果缓存不适用, 则调用传入的拦截器处理请求, 并根据返回的情况更新缓存时间或重写缓存. 我个人觉得需要注意它代码中多处地方都非常注意关闭缓存, 以防止内存泄漏. 下一个拦截器是ConnectInterceptor, 这应该就是真正发起请求的地方了, 它的代码比意料中少:


  // in file okhttp3/internal/connection/ConnectInterceptor.java
  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();// #1

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); // #2
    RealConnection connection = streamAllocation.connection();// #3

    return realChain.proceed(request, streamAllocation, httpCodec, connection);// #4 注意它虽然调用的是`proceed`而不是`intercept`, 但由于`RealInterceptorChain`中`proceed`的实现就是调用当前`intercept.intercept()`, 所以最终还是走到`intercept()`
  }

这里关键步骤出来了: 分配IO Stream, Connect, Proceed. 这里传入的拦截器是CallServerInterceptor. 打开分别看看这三步对应的代码. streamAllocation返回的是此次连接的StreamAllocation, connection()返回的也是此次连接, 这两个值从上文可以看到是在retryAndFollowUpInterceptor中就已经初始化了. proceed最终走到CallServerInterceptor.intercept, 代码注释如下:


  // in file okhttp3/internal/http/CallServerInterceptor.java
  @Override public Response intercept(Chain chain) throws IOException {
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      ...

      // Write the request body, unless an "Expect: 100-continue" expectation failed.
      ...
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

厉害了, 原来最后用的是HttpCodec的实例来完成的请求发送和响应接收, 查看它的继承类可以看到Http1CodecHttp2Codec, 明显就是对应两种协议了, 前面在ConnectInterceptor#2的时候实例化了这个编码解码器(实例化过程中通过连接池获取一个可用的连接).
这一条路径下来, 请求构造->连接构造->请求发送->响应接收->响应解析都完成了, 不得不说大神的代码看起来有点吃力但是非常舒服(拦截器链这一块, 原先曾经是HttpEngine.java完成上文三个默认拦截器的工作, 由Square的另一个大神分解为三个单独的职责分明的拦截器), 逻辑清晰层次分明, 而且还有够用的注释, 吃力主要还是因为自己的抽象能力不足, 对于网络连接的场景需要做什么在心里没有一个完整的模型.
代码中可以看到通过连接池对连接进行了复用, 具体的数据读写使用了Okio(也是Square家的项目).

OkHttp缓存Cache

下面看看最后一个重点: 缓存Cache.
上文提到, 在CacheInterceptor.intercept中, 会在InternalCache cache中寻找当前请求的缓存, 并获取当前请求的缓存策略(CacheStrategy.Factory(time, request, cacheCandidate).get()), 如果cache不为null调用cache.trackResponse, 其余的判断是否请求被禁止或是否需要缓存等内容我们略过不提, 单看缓存更新cache.update(cachedResponse, networkResponse)cacheWritingResponse(cacheRequest, response), 这两种情况都已经访问了网络并获得了响应, 前者的响应表示未更改(状态码304), 客户端应直接读缓存, 所以我们只需要更新缓存的部分内容即可, 后者则需要更新整个缓存的cacheResponse. 两者代码分别如下:


// in file okhttp3/Cache.java
void update(Response cached, Response network) {
  Entry entry = new Entry(network);
  DiskLruCache.Snapshot snapshot = ((CacheResponseBody) cached.body()).snapshot;
  DiskLruCache.Editor editor = null;
  try {
    editor = snapshot.edit(); // Returns null if snapshot is not current.
    if (editor != null) {
      entry.writeTo(editor);
       // 类似map, 写入缓存. 缓存的默认实现是DiskLruCache  
    }
  } catch (IOException e) {
    abortQuietly(editor);
  }
}

// in file okhttp3/internal/cache/CacheInterceptor.java

  /**
   * Returns a new source that writes bytes to {@code cacheRequest} as they are read by the source
   * consumer. This is careful to discard bytes left over when the stream is closed; otherwise we
   * may never exhaust the source stream and therefore not complete the cached response.
   */
  private Response cacheWritingResponse(final CacheRequest cacheRequest, Response response)
      throws IOException {
    // Some apps return a null body; for compatibility we treat that like a null cache request.
    if (cacheRequest == null) return response;
    Sink cacheBodyUnbuffered = cacheRequest.body();
    if (cacheBodyUnbuffered == null) return response;

    final BufferedSource source = response.body().source();
    final BufferedSink cacheBody = Okio.buffer(cacheBodyUnbuffered);

    Source cacheWritingSource = new Source() {
      boolean cacheRequestClosed;

      @Override public long read(Buffer sink, long byteCount) throws IOException {
        long bytesRead;
        try {
          bytesRead = source.read(sink, byteCount);
        } catch (IOException e) {
          if (!cacheRequestClosed) {
            cacheRequestClosed = true;
            cacheRequest.abort(); // Failed to write a complete cache response.
          }
          throw e;
        }

        if (bytesRead == -1) {
          if (!cacheRequestClosed) {
            cacheRequestClosed = true;
            cacheBody.close(); // The cache response is complete!
          }
          return -1;
        }

        sink.copyTo(cacheBody.buffer(), sink.size() - bytesRead, bytesRead);
        cacheBody.emitCompleteSegments();
        return bytesRead;
      }

      @Override public Timeout timeout() {
        return source.timeout();
      }

      @Override public void close() throws IOException {
        if (!cacheRequestClosed
            && !discard(this, HttpCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
          cacheRequestClosed = true;
          cacheRequest.abort();
        }
        source.close();
      }
    }; // 实现一个读取流, 读取Okio.BufferedSource接口的实例`response.body().source()`

    return response.newBuilder()
        .body(new RealResponseBody(response.headers(), Okio.buffer(cacheWritingSource)))
        .build(); // 使用以上实现构造RealResponseBody, 传入当前响应, 构造新的响应拷贝
  }

其中cache的值是在RealCall中传入new CacheInterceptor(client.internalCache())时初始化的, 其值默认是OkHttpClient.cache.internalCache, 除非cache为空, 彼时会使用OkHttpClient.internalCache. 换句话说, 如果我们初始化OkHttpClient时把cache置空则可以通过设置internalCache使用我们自己的缓存. 事实上OkHttpClient.Builder中这两个缓存正是互斥的:


/** Sets the response cache to be used to read and write cached responses. */
void setInternalCache(InternalCache internalCache) {
  this.internalCache = internalCache;
  this.cache = null;
}

/** Sets the response cache to be used to read and write cached responses. */
public Builder cache(Cache cache) {
  this.cache = cache;
  this.internalCache = null;
  return this;
}

参考

Top