前言
由于工作需要使用到ftp服务,一开始是每次建立ftp连接,上传文件成功后,再释放连接,后来发现这个方法太浪费资源和时间了,就想到了使用ftp连接池的方式实现,这样,预先创建好ftp连接池,需要上传的时候从池子取一个连接,上传成功后再放回池子即可,省下了创建和释放ftp连接的时间。
实现
ftp服务的配置文件
在
config.properties
配置好ftp服务
ftp.ip=127.0.0.1
ftp.username=root
ftp.password=root
ftp.port=21
FtpClientConfig
FtpClientConfig
是用于读取config.properties
的一个实体类
public class FtpClientConfig {
private String host;
private int port;
private String username;
private String password;
...
FtpClientFactory
FtpClientFactory
可以理解为一个工厂类,用于生成ftp连接、销毁ftp连接以及检测ftp连接是否有效。
- 生成ftp连接
在生成ftp连接的时候,我们可以设定连接的超时时间等,ftp有主动模式和被动模式两种模式。
- 主动模式:FTP客户端随机开启一个大于1024的端口N向服务器的21号端口发起连接,然后开放N+1号端口进行监听,并向服务器发出PORT N+1命令。服务器接收到命令后,会用其本地的FTP数据端口(通常是20)来连接客户端指定的端口N+1,进行数据传输
- 被动模式:FTP客户端随机开启一个大于1024的端口N向服务器的21号端口发起连接,同时会开启N+1号端口。然后向服务器发送PASV命令,通知服务器自己处于被动模式。服务器收到命令后,会开放一个大于1024的端口P进行监听,然后用PORT P命令通知客户端,自己的数据端口是P。客户端收到命令后,会通过N+1号端口连接服务器的端口P,然后在两个端口之间进行数据传输。
public FTPClient makeClient() throws Exception{
FTPClient ftpClient = new FTPClient();
ftpClient.setConnectTimeout(1000 * 10);
try {
ftpClient.connect(config.getHost(), config.getPort());
boolean result = ftpClient.login(config.getUsername(), config.getPassword());
if(!result) {
log.info("ftp登录失败,username: {}",config.getUsername());
return null;
}
ftpClient.setControlEncoding(encode);
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
//被动模式 被动模式是客户端向服务端发送PASV命令,服务端随机开启一个端口并通知客户端,客户端根据该端口与服务端建立连接,然后发送数据。服务端是两种模式的,
//使用哪种模式取决于客户端,同时关键点在于网络环境适合用哪种模式,比如客户端在防火墙内,则最好选择被动模式
//在mac下测试用被动模式没问题,用主动模式则报错,在linux服务器上则相反
//ftpClient.enterLocalPassiveMode();
ftpClient.enterLocalActiveMode();
} catch (Exception e) {
log.error("makeClient exception",e);
destroyClient(ftpClient);
throw e;
}
return ftpClient;
}
- 销毁ftp连接
public void destroyClient(FTPClient ftpClient) {
try {
if(ftpClient != null && ftpClient.isConnected()) {
ftpClient.logout();
}
} catch (Exception e) {
log.error("ftpClient logout exception",e);
} finally {
try {
if(ftpClient != null) {
ftpClient.disconnect();
}
} catch (Exception e2) {
log.error("ftpClient disconnect exception",e2);
}
}
}
- 检测ftp连接
public boolean validateClient(FTPClient ftpClient) {
try {
return ftpClient.sendNoOp();
} catch (Exception e) {
log.error("ftpClient validate exception",e);
}
return false;
}
FtpClientPool
FtpClientPool
就是我们真正使用的类,我们使用了BlockingQueue
阻塞对列来实现连接池的效果,如果需要进行ftp连接,就从连接池获取一个连接,完成后就把连接归还到池子里。使用阻塞对列是为了防止多线程时多个线程同时获取了同一个ftp连接导致失败。
private static final int DEFAULT_POOL_SIZE = 16;
private BlockingQueue<FTPClient> pool;
private FtpClientFactory factory;
public FtpClientPool(FtpClientFactory factory) {
this(factory, DEFAULT_POOL_SIZE);
}
public FtpClientPool(FtpClientFactory factory,int size) {
this.factory = factory;
this.pool = new ArrayBlockingQueue<>(size);
initPool(size);
}
- 初始化
private void initPool(int maxPoolSize) {
try {
int count = 0;
while (count < maxPoolSize) {
pool.offer(factory.makeClient(),10,TimeUnit.SECONDS);
count ++;
}
} catch (Exception e) {
log.error("ftp连接池初始化失败",e);
}
}
- 从阻塞对列获取一个ftp连接
public FTPClient borrowClient() throws Exception{
FTPClient client = pool.take();
if(client == null) {
client = factory.makeClient();
//addClient(client);
returnClient(client);
}else if(!factory.validateClient(client)) {
invalidateClient(client);
client = factory.makeClient();
//addClient(client);
returnClient(client);
}
return client;
}
- 归还一个ftp连接
public void returnClient(FTPClient ftpClient) throws Exception{
try {
if(ftpClient != null && !pool.offer(ftpClient, 10, TimeUnit.SECONDS)) {
factory.destroyClient(ftpClient);
}
} catch (Exception e) {
log.error("归还对象失败",e);
throw e;
}
}
FtpClientKeepAlive
如果服务器设置了ftp连接在一段时间内不使用会自动断开连接,就会导致我们的连接超过时间就会失败,为了避免一直重复创建连接,这里使用了长连接,
FtpClientKeepAlive
负责保持长连接,如果连接失效,就重新创建连接。
根据服务器超时时间设置长连接保持的时间,每隔一段时间,从阻塞对列获取连接来进行验证。
public class FtpClientKeepAlive {
private static final Logger log = LoggerFactory.getLogger(FtpClientKeepAlive.class);
private KeepAliveThread keepAliveThread;
@Autowired
private FtpClientPool ftpClientPool;
public void init() {
// 启动心跳检测线程
if (keepAliveThread == null) {
keepAliveThread = new KeepAliveThread();
Thread thread = new Thread(keepAliveThread);
thread.start();
}
}
class KeepAliveThread implements Runnable {
@Override
public void run() {
FTPClient ftpClient = null;
while (true) {
try {
BlockingQueue<FTPClient> pool = ftpClientPool.getPool();
if (pool != null && pool.size() > 0) {
Iterator<FTPClient> it = pool.iterator();
while (it.hasNext()) {
ftpClient = it.next();
boolean result = ftpClient.sendNoOp();
log.info("心跳结果: {}",result);
if (!result) {
ftpClientPool.invalidateClient(ftpClient);
}
}
}
} catch (Exception e) {
log.error("ftp心跳检测异常", e);
ftpClientPool.invalidateClient(ftpClient);
}
// 每30s发送一次心跳,服务器超时时间为60s
try {
Thread.sleep(1000 * 30);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
log.error("ftp休眠异常", e);
}
}
}
}
}
spring-ftp.xml
由于项目是使用spring的,所以在xml配置文件里进行bean的配置。
<!-- 省略了spring beans头部配置-->
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:config.properties" />
</bean>
<bean id="ftpClientConfig" class="com.zeromk.study.util.FtpClientConfig">
<property name="host" value="${ftp.ip}"/>
<property name="port" value="${ftp.port}"/>
<property name="username" value="${ftp.username}"/>
<property name="password" value="${ftp.password}"/>
</bean>
<bean id="ftpClientFactory" class="com.zeromk.study.util.FtpClientFactory">
<constructor-arg index="0" ref="ftpClientConfig"/>
</bean>
<bean id="ftpClientPool" class="com.zeromk.study.util.FtpClientPool">
<constructor-arg index="0" ref="ftpClientFactory"/>
<constructor-arg index="1" value="8"/>
</bean>
<bean id="ftpClientKeepAlive" class="com.zeromk.study.util.FtpClientKeepAlive" init-method="init">
</bean>
源码
详细代码请参考github。
参考
https://github.com/jimiyi/ftpService
http://blog.51cto.com/11010174/1983978