对象池说明
在项目中,我们经常听到连接池,例如数据库连接池,jedis连接池等等。
apache提供了一个公共连接池pool2包提供了一个通用的对象池技术实现。可以很方便的基于它来实现自己的对象池,比如DBCP和Jedis他们的内部对象池的实现就是依赖于Common-pool2。
对于有些对象来说,其创建的代价还是比较昂贵的,比如线程、tcp连接、数据库连接等对象,因此对象池技术还是有其存在的意义。
Common-pool2由三大模块组成:ObjectPool、PooledObject和PooledObjectFactory。
- ObjectPool:提供所有对象的存取管理。
- PooledObject:池化的对象,是对对象的一个包装,加上了对象的一些其他信息,包括对象的状态(已用、空闲),对象的创建时间等。
- PooledObjectFactory:工厂类,负责池化对象的创建,对象的初始化,对象状态的销毁和对象状态的验证。
另外,还有一个比较重要的GenericObjectPoolConfig
- GenericObjectPoolConfig:提供对象池的配置信息。
使用说明
直接上案例.先看一个demo案例
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Computer {
private String name;
}
public class ComputerPoolFactory implements PooledObjectFactory<Computer> {
public PooledObject<Computer> makeObject() {
return new DefaultPooledObject<>(new Computer("zcj"));
}
public void destroyObject(PooledObject<Computer> pooledObject) {
Computer object = pooledObject.getObject();
object = null;
}
public boolean validateObject(PooledObject<Computer> pooledObject) {
return false;
}
public void activateObject(PooledObject<Computer> pooledObject) {
}
public void passivateObject(PooledObject<Computer> pooledObject) {
}
}
@Slf4j
public class ComputerPoolTest {
private static GenericObjectPool<Computer> computerPool;
public static void main(String[] args) {
// 注册对象池
GenericObjectPoolConfig<Computer> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(3);
poolConfig.setJmxEnabled(false);
computerPool = new GenericObjectPool<>(new ComputerPoolFactory(), poolConfig);
// 模拟多线程从对象池获取Computer进行业务处理
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(ComputerPoolTest::doSomethingByComputer);
}
executorService.shutdown();
}
private static void doSomethingByComputer() {
Computer computer = null;
try {
// 从对象池获取对象
computer = computerPool.borrowObject();
System.out.println(computer);
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 使用完返回到对象池
computerPool.returnObject(computer);
}
}
}
运行结果
image.png
下面是支付宝客户端池化案例
@Component
@Slf4j
public class AlipayClientPoolTemplate {
// 定义池对象类型
private GenericObjectPool<AlipayClient> pool;
@Autowired
private AlipayProperties alipayProperties;
@PostConstruct
public void init() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 设置池对象数量
poolConfig.setMaxTotal(10);
poolConfig.setJmxEnabled(false);
pool = new GenericObjectPool<>(new BasePooledObjectFactory<AlipayClient>() {
@Override
public AlipayClient create() {
// 返回对象实例
return new DefaultAlipayClient(alipayProperties.getServerUrl(),
alipayProperties.getAppid(),
alipayProperties.getPrivateKey(),
"JSON",
"utf-8",
alipayProperties.getAlipayPublicKey(),
"RSA2");
}
@Override
public PooledObject<AlipayClient> wrap(AlipayClient obj) {
return new DefaultPooledObject<>(obj);
}
}, poolConfig);
}
/**
* 池获取对象执行方法
*/
public <T extends AlipayResponse> T execute(AlipayRequest<T> request) {
AlipayClient alipayClient = null;
try {
alipayClient = pool.borrowObject();
T response = alipayClient.execute(request);
log.info("调用支付宝接口,response->{},bizModel->{}", JSON.toJSONString(response), JSON.toJSONString(request.getBizModel()));
return response;
} catch (Exception e) {
log.error("调用支付宝接口失败,bizModel->{},exception:", JSON.toJSONString(request.getBizModel()), e);
return null;
} finally {
if (alipayClient != null) {
pool.returnObject(alipayClient);
}
}
}
}
下面是RestTemplate的案例
池化前
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory requestFactory){
return new RestTemplate(requestFactory);
}
@Bean
public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setReadTimeout(10000);
factory.setConnectTimeout(10000);
return factory;
}
}
池化后
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory requestFactory){
return new RestTemplate(requestFactory);
}
@Bean
public ClientHttpRequestFactory httpRequestFactory(){
return new HttpComponentsClientHttpRequestFactory(httpClient());
}
@Bean
public HttpClient httpClient(){
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", SSLConnectionSocketFactory.getSocketFactory())
.build();
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry);
//设置连接池最大是500个连接
connectionManager.setMaxTotal(500);
//MaxPerRoute是对maxtotal的细分,每个主机的并发最大是300,route是指域名
connectionManager.setDefaultMaxPerRoute(300);
RequestConfig requestConfig = RequestConfig.custom()
//返回数据的超时时间
.setSocketTimeout(20000)
//连接上服务器的超时时间
.setConnectTimeout(10000)
//从连接池中获取连接的超时时间
.setConnectionRequestTimeout(1000)
.build();
CloseableHttpClient closeableHttpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig)
.setConnectionManager(connectionManager)
.build();
return closeableHttpClient;
}
}
下面是对于es的RestHighLevelClient的池化案例
es的属性配置类EsClientParams
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
@Data
public class EsClientParams {
private List<Map<String, String>> ips;
public static final String HOSTNAME = "host";
public static final String PORT = "port";
private Integer poolSize ;
public List<HttpHost> getHosts() {
List<HttpHost> hosts = new ArrayList<>();
for (Map<String, String> ip : ips) {
HttpHost host = new HttpHost(ip.get(HOSTNAME), Integer.parseInt(ip.get(PORT)));
hosts.add(host);
}
return hosts;
}
}
PooledObjectFactory<RestHighLevelClient>的配置
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient> {
private static Logger logger = LoggerFactory.getLogger(EsClientPoolFactory.class);
private HttpHost[] httpHosts;
public EsClientPoolFactory(HttpHost[] httpHosts) {
this.httpHosts = httpHosts;
}
/**
* 初始化对象
* @param arg0
* @throws Exception
*/
@Override
public void activateObject(PooledObject<RestHighLevelClient> arg0){
}
/**
* 销毁对象
*/
@Override
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient highLevelClient = pooledObject.getObject();
highLevelClient.close();
}
/**
* 生产对象
*/
@Override
public PooledObject<RestHighLevelClient> makeObject(){
RestHighLevelClient client = null;
try {
client = new RestHighLevelClient(RestClient.builder(httpHosts));
} catch (Exception e) {
logger.error("es客户端连接池创建新客户端出错:{}", e.getMessage());
}
return new DefaultPooledObject<>(client);
}
/**
* 对象实例返还对象池
* @param arg0
* @throws Exception
*/
@Override
public void passivateObject(PooledObject<RestHighLevelClient> arg0){
}
/**
* 校验对象
* @param arg0
* @return
*/
@Override
public boolean validateObject(PooledObject<RestHighLevelClient> arg0) {
return true;
}
}
GenericObjectPool<RestHighLevelClient>的配置
@Configuration
public class EsConfiguration {
@Autowired
private EsClientParams esClientConfig;
@Bean(name = "esClientPool")
public GenericObjectPool<RestHighLevelClient> getEsClientPool() {
return new GenericObjectPool<>(getEsClientPoolFactory(), getGenericObjectPoolConfig());
}
// 利用对象工厂类和配置类生成对象池
@Bean
public EsClientPoolFactory getEsClientPoolFactory() {
return new EsClientPoolFactory(getHttpHostArr());
}
@Bean
public GenericObjectPoolConfig getGenericObjectPoolConfig() {
// 对象池配置类,不写也可以,采用默认配置
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(esClientConfig.getPoolSize());
poolConfig.setJmxEnabled(false);
return poolConfig;
}
/**
* 构建es连接 httpHost
* @return
*/
private HttpHost[] getHttpHostArr() {
List<Map<String, String>> ips = esClientConfig.getIps();
HttpHost[] httpHostsArr = new HttpHost[ips.size()];
for(int i = 0; i < ips.size(); i++) {
Map<String, String> ip = ips.get(i);
httpHostsArr[i] = new HttpHost(ip.get(EsClientParams.HOSTNAME), Integer.parseInt(ip.get(EsClientParams.PORT)), "http");
}
return httpHostsArr;
}
}
GenericObjectPool<RestHighLevelClient>的封装
@Component
public class ElasticSearchPool {
// 利用对象工厂类和配置类生成对象池
@Resource(name = "esClientPool")
private GenericObjectPool<RestHighLevelClient> clientPool;
/**
* 获得对象
*
* @return
* @throws Exception
*/
public RestHighLevelClient getClient() throws Exception {
// 从池中取一个对象
RestHighLevelClient client = clientPool.borrowObject();
return client;
}
/**
* 归还对象
*
* @param client
*/
public void returnClient(RestHighLevelClient client) {
// 使用完毕之后,归还对象
if (clientPool != null) {
clientPool.returnObject(client);
}
}
}
使用案例
@Component
public class ESClient {
private static final Logger logger = LoggerFactory.getLogger(ESClient.class);
@Autowired
private ElasticSearchPool elasticSearchPool;
public SearchResponseParser search(SearchRequest searchRequest) throws Exception {
RestHighLevelClient client = null;
try {
// 对象池中获取对象
client = elasticSearchPool.getClient();
logger.info("ES request:\n{}", searchRequest.toString());
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (!RestStatus.OK.equals(searchResponse.status()) || searchResponse.isTimedOut()) {
logger.error("ES FAIL ,response :\n{}", StringUtil.prettyJsonString(searchResponse));
throw HsyException.of(CommonErrorEnum.REMOTE_ERROR, "es");
}
logger.info("ES SUCCESS, response:\n{}", searchResponse);
return new SearchResponseParser(searchResponse);
} catch (Exception e) {
logger.error("获取esClient失败或查询失败, exception:" , e);
throw e;
} finally {
if (client != null) {
elasticSearchPool.returnClient(client);
}
}
}
public SearchResponseParser scorllSearch(SearchScrollRequest searchScrollRequest) {
SearchResponse searchResponse = null;
RestHighLevelClient client = null;
try {
client = elasticSearchPool.getClient();
logger.info("ES request:\n{}", JSONObject.toJSONString(searchScrollRequest));
searchResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
if (!RestStatus.OK.equals(searchResponse.status()) || searchResponse.isTimedOut()) {
logger.error(">>> es请求失败 返回结果:{}", searchResponse);
}
} catch (Exception e) {
logger.error("获取esClient失败或查询失败,exception:" : , e);
} finally {
if (client != null) {
elasticSearchPool.returnClient(client);
}
}
return new SearchResponseParser(searchResponse);
}
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest) throws Exception {
RestHighLevelClient client = null;
try {
client = elasticSearchPool.getClient();
ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
return clearScrollResponse;
} finally {
if (client != null) {
elasticSearchPool.returnClient(client);
}
}
}
}