有时候,仅仅把池中的所有对象都当成一类对象实现PooledObjectFactory对象池并不能解决所有问题,有时候我们需要根据一些参数,比如key的值去查找某些指定的池中对象,比如netty根据一个address获取一个channel连接。此时我们需要实现KeyedPoolableObjectFactory。
1. 引入依赖
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.30.Final</version>
</dependency>
<!-- Objenesis -->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
2. 代码实现
netty通信的简单demo详细代码见本文。该文主要改造client端,netty的channel实现对象池统一管理。根据不同的地址来管理channel。
- 设置Bootstrap的详细配置。
/**
* netty的客户端。主要目的是生成Bootstrap
*/
public class NettyClient {
private Bootstrap bootstrap = new Bootstrap();
{
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new RpcDecoder(Response.class));
pipeline.addLast(new RpcEncoder(Request.class));
pipeline.addLast(new NettyClientHandler());
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
}
public NettyClient() {
}
/**
* 返回
*/
public Bootstrap getBootstrap() {
return bootstrap;
}
}
- 定义channel的处理器
/**
* channel的处理器
*/
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<Response> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
log.info("输出数据:" + JSON.toJSONString(response));
}
}
- 对象池的key信息
/**
* 对象池的key信息
*/
@Data
public class NettyPoolKey {
private String address;
}
2.1 定义对象池
实现KeyedPoolableObjectFactory接口。重写makeObject
方法,产生对象
import com.tellme.dto.NettyPoolKey;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
@Slf4j
public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {
private Bootstrap bootstrap = new NettyClient().getBootstrap();
public NettyPoolableFactory() {
}
@Override
public Channel makeObject(NettyPoolKey key) throws Exception {
/**
* 将地址转换为socket地址
*/
InetSocketAddress address = toInetSocketAddress(key.getAddress());
return getChannel(address);
}
private Channel getChannel(InetSocketAddress address) {
Channel channel;
ChannelFuture future = bootstrap.connect(address);
//获取连接
try {
future.await(30, TimeUnit.MILLISECONDS);
if (future.isCancelled()) {
throw new RuntimeException("连接被取消");
} else if (!future.isSuccess()) {
throw new RuntimeException("连接获取失败");
} else {
channel = future.channel();
}
} catch (Exception e) {
throw new RuntimeException("连接获取异常", e);
}
return channel;
}
/**
* 销毁channel
*/
@Override
public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {
if (channel != null) {
channel.disconnect();
channel.close();
}
}
/**
* 校验channel
*/
@Override
public boolean validateObject(NettyPoolKey key, Channel channel) {
if (channel != null && channel.isActive()) {
return true;
}
return false;
}
@Override
public void activateObject(NettyPoolKey key, Channel obj) throws Exception {
}
@Override
public void passivateObject(NettyPoolKey key, Channel obj) throws Exception {
}
/**
* To inet socket address inet socket address.
*
* @param address the address
* @return the inet socket address
*/
public static InetSocketAddress toInetSocketAddress(String address) {
int i = address.indexOf(':');
String host;
int port;
if (i > -1) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = 0;
}
return new InetSocketAddress(host, port);
}
}
2.2 对象池的使用
对象池使用时,需要borrowObject
获取对象,当使用完对象后,需要调用returnObject
回收资源。
public class NettyTest {
public static void main(String[] args) throws Exception {
NettyPoolableFactory nettyPoolableFactory = new NettyPoolableFactory();
//创建对象池
GenericKeyedObjectPool<NettyPoolKey, Channel> pool = new GenericKeyedObjectPool<>(nettyPoolableFactory);
NettyPoolKey nettyPoolKey = new NettyPoolKey();
nettyPoolKey.setAddress("127.0.0.1:20000");
//线程池获取资源
Channel channel = pool.borrowObject(nettyPoolKey);
Request request = new Request();
request.setRequestId(UUID.randomUUID().toString());
request.setParameter("Hello Server !");
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
//回收资源
pool.returnObject(nettyPoolKey, channel);
Thread.sleep(10000);
}
}