客户端创建
1 | CloseableHttpClient httpClient = HttpClientBuilder.create() |
连接池
连接池建造者
建造者模式在创建HttpClient
时如果没设置自定义连接池,会默认创建池化连接管理器
Registry
用于创建套接字上下文,dnsResolver
用于域名解析
HttpClientBuilder::build()1
2
3
4
5
6
7
8
9
10
11// line: 984
final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", sslSocketFactoryCopy)
.build(),
null,
null,
dnsResolver,
connTimeToLive,
connTimeToLiveTimeUnit != null ? connTimeToLiveTimeUnit : TimeUnit.MILLISECONDS);
池化连接管理器
使用内部连接工厂
、连接相关参数
构造CPool
InternalConnectionFactory
作为入参,用于连接池调用工厂生产连接
默认全局最大连接
20、每个路由最大连接
2
PoolingHttpClientConnectionManager::()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// line: 175
/**
* @since 4.4
*/
public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final long timeToLive, final TimeUnit timeUnit) {
super();
this.configData = new ConfigData();
this.pool = new CPool(new InternalConnectionFactory(
this.configData, connFactory), 2, 20, timeToLive, timeUnit);
this.pool.setValidateAfterInactivity(2000);
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
this.isShutDown = new AtomicBoolean(false);
}
如果没有自定义连接工厂,将使用默认实例1
2
3
4
5
6
7
8
9
10// line: 610
static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
...
InternalConnectionFactory(
final ConfigData configData,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
...
this.connFactory = connFactory != null ? connFactory :
ManagedHttpClientConnectionFactory.INSTANCE;
}
受控连接工厂
默认工厂实例化,用于处理IO缓冲流、上下文策略等
完成工厂施工后,即可交付连接池用于生产连接DefaultBHttpClientConnection
ManagedHttpClientConnectionFactory::()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// line: 74
/**
* @since 4.4
*/
public ManagedHttpClientConnectionFactory(
final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
final HttpMessageParserFactory<HttpResponse> responseParserFactory,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy) {
super();
this.requestWriterFactory = requestWriterFactory != null ? requestWriterFactory :
DefaultHttpRequestWriterFactory.INSTANCE;
this.responseParserFactory = responseParserFactory != null ? responseParserFactory :
DefaultHttpResponseParserFactory.INSTANCE;
this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
LaxContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
StrictContentLengthStrategy.INSTANCE;
}
抽象连接池
池的大部分功能在抽象类AbstractConnPool
里实现
实现 ConnPool
接口的租用
、释放
实现 ConnPoolControl
接口的连接池大小
、连接池状态
1 | class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> |
T
路由类型:HttpRouteC
连接类型:ManagedHttpClientConnectionE
连接实例:CPoolEntry
连接池结构
PoolEntry<HttpRoute, ManagedHttpClientConnection>
连接池中的实体:时间、连接(连接创建时绑定套接字)ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory
连接工厂:用于创建连接
HttpClientBuilder::build() ->
PoolingHttpClientConnectionManager::() ->
CPool::(InternalConnectionFactory) ->
ManagedHttpClientConnectionFactory::INSTANCEHashSet<PoolEntry> leased
存放被租用的连接:
如果池中直接取到连接 leased+1
如果池中取不到且小于限额,创建连接 leased+1
归还连接时 leased-1LinkedList<PoolEntry> available
存放可用连接:
租用连接时,如果连接不可用,或需要腾出连接 available-1
归还连接时,如果连接可复用 available+1LinkedList<PoolEntryFuture> pending
存放等待获取连接的线程的Future:
租用连接时,如果未成功获取连接,将加入等待队列并等待,直至被唤醒或等待超时
归还连接时,从池里取等待Future,如果成功从等待队列中移除,唤醒全体等待线程来租用连接Map<HttpRoute, RouteSpecificPool<T, C, E>> routeToPool
每个路由对应的池:特定路由池内同样有leased
available
pending
,用于不同路由的连接数量隔离Lock lock
锁:同步锁
和池相关的属性访问、修改,都需要先获取锁以独占资源访问Condition condition
多线程竞态条件:
线程释放连接、Future主动取消,将唤醒全体
线程未成功获取连接,将主动进入等待状态,等待被唤醒
获取池条目
池内条目实例 E extends PoolEntry<T, C>
属性:ID、路由信息、受控连接、创建时间、更新时间、连接存活时间、过期时间及连接状态
其中存活时间
和过期时间
的区别在于,存活时间
不可更新,过期时间
可随更新时间
、新过期时长
的变化而变化
其中受控连接
由受控连接工厂
在AbstractConnPool
租用连接时构建
租用连接
Future<CPoolEntry>
来自于抽象连接池
,用于区分多客户端线程、获取池内条目结果
AbstractConnPool::lease1
2
3
4
5
6
7
8
9
10
11
12// line: 193
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
...
return new Future<E>() {
...
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
...
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
...
受控连接工厂
的实现内部连接工厂
以路由为参数,构建连接实例1
2
3
4
5
6
7
8
9
10
11// line: 310
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
...
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
...
请求连接
池化连接管理器
获取请求连接
PoolingHttpClientConnectionManager::requestConnection1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// line: 261
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
...
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
...
public HttpClientConnection get(
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
...
租用连接
方法,调用Future
获取获取连接
结果1
2
3
4
5
6
7
8
9
10
11// line: 300
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final CPoolEntry entry;
try {
entry = future.get(timeout, timeUnit);
...
return CPoolProxy.newProxy(entry);
...
获取连接
从池化连接管理器
中取连接
此处取到的是实现了HttpClientConnection
接口的CPoolEntry
代理类CPoolProxy
requestConnection
方法返回Future
的匿名对象,其中实现了取连接过程connRequest.get
去池中取连接,如果阻塞程序将卡在这一步timeout
为配置参数connectionRequestTimeout
默认-1不超时
MainClientExec::execute1
2
3
4
5
6
7
8
9
10
11
12
13
14// line: 176
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
...
final HttpClientConnection managedConn;
...
// line: 190
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
...
// line: 233
if (!managedConn.isOpen()) {
this.log.debug("Opening connection " + route);
try {
establishRoute(proxyAuthState, managedConn, route, request, context);
...
建立连接
建立路由
用连接对象、路由对象、超时时间、上下文建立路由
MainClientExec::establishRoute1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// line: 173
/**
* Establishes the target route.
*/
void establishRoute(
final AuthState proxyAuthState,
final HttpClientConnection managedConn,
final HttpRoute route,
final HttpRequest request,
final HttpClientContext context) throws HttpException, IOException {
...
case HttpRouteDirector.CONNECT_TARGET:
this.connManager.connect(
managedConn,
route,
timeout > 0 ? timeout : 0,
context);
...
打开连接
把上文抽象连接池
的内部连接工厂
生产的连接
从池条目
里再取出来
PoolingHttpClientConnectionManager::connect1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// line: 357
public void connect(
final HttpClientConnection managedConn,
final HttpRoute route,
final int connectTimeout,
final HttpContext context) throws IOException {
Args.notNull(managedConn, "Managed Connection");
Args.notNull(route, "HTTP route");
final ManagedHttpClientConnection conn;
synchronized (managedConn) {
final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
conn = entry.getConnection();
}
...
this.connectionOperator.connect(
conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
}
连接
交给默认客户端连接操作
做域名解析、绑定套接字
DefaultHttpClientConnectionOperator::connect1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// line: 98
public void connect(
final ManagedHttpClientConnection conn,
final HttpHost host,
final InetSocketAddress localAddress,
final int connectTimeout,
final SocketConfig socketConfig,
final HttpContext context) throws IOException {
...
final InetAddress[] addresses = host.getAddress() != null ?
new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName());
...
Socket sock = sf.createSocket(context);
...
conn.bind(sock);
IO
发送
- 客户端执行
- 可关闭客户端执行
- 内部客户端执行
- 客户端主程序运行
- 请求执行器运行
- 连接刷数据
- 输出会话缓冲刷数据
- 输出套接字写数据、刷数据
1 | httpClient.execute() -> |
接收
- 输入套接字读取信息
- 输入会话缓存读取流
- 应答转换器转换数据
- 连接获取应答头
- 请求执行器获取应答
- 客户端主程序获取应答
- 内部客户端获取应答
- 可关闭客户端获取应答
- 客户端获取应答
1 | SocketInputStream.socketRead -> |
释放连接
应答代理
检查应答实例,如果实例不存在或不是流则释放连接,HttpResponseProxy
用于代理释放连接
MainClientExec::execute1
2
3
4
5
6
7
8
9// line: 335
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
}
return new HttpResponseProxy(response, connHolder);
HttpResponseProxy::()1
2
3
4
5
6// line: 53
public HttpResponseProxy(final HttpResponse original, final ConnectionHolder connHolder) {
this.original = original;
this.connHolder = connHolder;
ResponseEntityProxy.enchance(original, connHolder);
}
应答实体代理
处理答实体的端内事务
enchance
包装原始应答实体,生成持有连接的包装类releaseConnection
用于释放连接getContent
包装器方法
EofSensorInputStream
会在取应答时被构建
持有包装类,同时植入Eof观察者
,以便在观察到Eof时触发eofDetected
方法释放连接eofDetected
观察者方法,用于检查Eof
streamClosed
观察者方法,用于关闭流
ResponseEntityProxy::enchance1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// line: 50
public static void enchance(final HttpResponse response, final ConnectionHolder connHolder) {
final HttpEntity entity = response.getEntity();
if (entity != null && entity.isStreaming() && connHolder != null) {
response.setEntity(new ResponseEntityProxy(entity, connHolder));
}
}
// line: 74
public void releaseConnection() {
if (this.connHolder != null) {
this.connHolder.releaseConnection();
}
}
// line: 85
public InputStream getContent() throws IOException {
return new EofSensorInputStream(this.wrappedEntity.getContent(), this);
}
// line: 113
public boolean eofDetected(final InputStream wrapped) throws IOException {
try {
// there may be some cleanup required, such as
// reading trailers after the response body:
if (wrapped != null) {
wrapped.close();
}
releaseConnection();
...
}
// line: 134
public boolean streamClosed(final InputStream wrapped) throws IOException {
try {
final boolean open = connHolder != null && !connHolder.isReleased();
// this assumes that closing the stream will
// consume the remainder of the response body:
try {
if (wrapped != null) {
wrapped.close();
}
releaseConnection();
...
结尾感应输入流
EofSensorInputStream
构造方法
观察者即ResponseEntityProxy
输入流即ResponseEntityProxy
持有的应答流包装类,用于关闭包装类内的原始流
DecompressingEntity::() -> LazyDecompressingInputStream::()
->
ResponseContentEncoding::process -> response.setEntity(DecompressingEntity)
->
response.getEntity()
close
用于外部关闭流checkClose
检查是否满足关闭条件
EofSensorInputStream::()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// line: 80
public EofSensorInputStream(final InputStream in,
final EofSensorWatcher watcher) {
Args.notNull(in, "Wrapped stream");
wrappedStream = in;
selfClosed = false;
eofWatcher = watcher;
}
// 168
public void close() throws IOException {
// tolerate multiple calls to close()
selfClosed = true;
checkClose();
}
// 221
protected void checkClose() throws IOException {
final InputStream toCloseStream = wrappedStream;
if (toCloseStream != null) {
try {
boolean scws = true; // should close wrapped stream?
if (eofWatcher != null) {
scws = eofWatcher.streamClosed(toCloseStream);
}
...
取应答内容
取输入流entity.getContent()
时
会先调用ResponseEntityProxy
构建 Eof感知流
、设置观察者
读取EofSensorInputStream
时,检查是否触发Eof;读取结束后,关闭流同时释放连接
1 | EntityUtils.toString(response.getEntity(), "UTF-8"); |
EntityUtils::toString1
2
3
4
5
6
7
8
9
10
11// line: 198
private static String toString(
final HttpEntity entity,
final ContentType contentType) throws IOException {
final InputStream inStream = entity.getContent();
...
while((l = reader.read(tmp)) != -1) {
...
} finally {
inStream.close();
}
释放过程
- inStream.close() ->
- EofSensorInputStream::close ->
- EofSensorInputStream::checkClose ->
- EofSensorWatcher::streamClosed ->
- ResponseEntityProxy::streamClosed ->
- ResponseEntityProxy::releaseConnection ->
- ConnectionHolder::releaseConnection ->
- CPoolProxy::close ->
- CPoolEntry::close ->
- BHttpConnectionBase::close ->
- Socket::shutdownInput/shutdownOutput
案例
https://github.com/BurningBright/poc/tree/master/httpclient
设置请求配置参数:连接数1,连接池请求超时时间2s
注入字节码:将response.getEntity()
取到的DecompressingEntity
包装实体,替换成的BufferedHttpEntity
缓冲包装实体
-javaagent:/xxx/agent-1.0-SNAPSHOT-jar-with-dependencies.jar
1 | org.apache.http.entity.BufferedHttpEntity newEntity = new org.apache.http.entity.BufferedHttpEntity(this.entity); |
请求两次,分别观察两次请求后的取连接、释放连接的细节
第一次请求,正常从连接池取到连接后,发送请求、接收应答
在构造缓冲实体
时数据从应答流
读到缓存区
,实体变为不可流,导致应答实体
未被代理包装
ResponseEntityProxy::enchance1
2
3
4
5
6
7// line: 50
public static void enchance(final HttpResponse response, final ConnectionHolder connHolder) {
final HttpEntity entity = response.getEntity();
if (entity != null && entity.isStreaming() && connHolder != null) {
response.setEntity(new ResponseEntityProxy(entity, connHolder));
}
}
这也意味着第一次读取结束后,仅关闭了应答流
而未在代理的EofSensorInputStream
中释放连接
第二次请求,将无法等到连接释放,抛出连接池超时异常
配置
套接字
属性 | 默认值 | 描述 |
---|---|---|
soTimeout | 0 | 非阻塞IO操作套接字超时时间 |
soReuseAddress | false | 仅用于广播套接字 |
soLinger | -1 | TCP RST 强制关闭连接超时时间(java7 NIO2) |
soKeepAlive | false | 是否保持TCP套接字,用于防止对方主机崩溃而浪费链接资源 |
tcpNoDelay | true | 是否延迟发送上一个TCP请求 |
sndBufSize | 0 | 说明底层发送缓存区的大小 |
rcvBufSize | 0 | 说明底层接收缓存区的大小 |
backlogSize | 0 | 最大积压连接请求 |
连接
属性 | 默认值 | 描述 |
---|---|---|
expectContinueEnabled | false | 是否打开’Expect: 100-continue’ handshake |
proxy | null | 请求代理:远端地址、端口、协议 |
localAddress | null | 用于多网络环境,区分请求源头 |
staleConnectionCheckEnabled | false | 是否启用旧连接请求检查 |
cookieSpec | null | 确定用于HTTP状态管理的cookie规格 |
redirectsEnabled | true | 是否自动处理重定向请求 |
relativeRedirectsAllowed | true | 是否允许相对重定向请求 |
circularRedirectsAllowed | false | 是否允许循环重定向请求 |
maxRedirects | 50 | 对打重定向次数 |
authenticationEnabled | true | 是否自动处理认证请求 |
targetPreferredAuthSchemes | null | 确定目标主机身份验证方案优先顺序 |
proxyPreferredAuthSchemes | null | 确定目标代理主机身份验证方案优先顺序 |
connectionRequestTimeout | -1 | 从连接管理器取连接的超时时间 |
connectTimeout | -1 | 建立连接超时时间 |
socketTimeout | -1 | 同SO_TIMEOUT |
contentCompressionEnabled | true | 是否自动解压缩(已在4.5淘汰) |
normalizeUri | true | 是否地址正常化(去除斜线?) |
连接池
属性 | 默认值 | 描述 |
---|---|---|
maxTotal | 20 | 连接池最大连接数 |
defaultMaxPerRoute | 2 | 每个路由的最大连接数 |
timeToLive | -1 | 连接活跃时间 |
timeUnit | MILLISECONDS | 活跃时间单位 |
validateAfterInactivity | 2000 | 连接变为不活跃后多久进行验证 |
httpcomponents-client-4.5.x
Apache Httpclient 连接池超时案例剖析
HttpClient 4.3连接池参数配置及源码解读
HttpClient连接池的一些思考