jsch的sftp在多线程下的问题及处理办法
作者 | 时间 | |
---|---|---|
雨中星辰 | 2022-02-09 |
jsch的sftp(ChannelSftp、Session)是不能在多线程下进行公用的,如果希望在多线程下操作sftp,那么ChannelSftp、Session需要放在ThreadLocal中。
单线程示例
SftpUtil2
:
import com.jcraft.jsch.*;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Field;
/**
* @author star
* @descripton sftp工具类(非线程安全的)
* @date 2021/6/10
**/
@Slf4j
public class SftpUtil2 {
Session session;
ChannelSftp channel;
public String username;
public String password;
public String remoteHost;
public Integer remotePort;
public String charset;
public SftpUtil2(String remoteHost, Integer remotePort, String username, String password, String charset) throws JSchException, SftpException {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.username = username;
this.password = password;
this.charset = charset;
connect();
}
public void uploadFile(ChannelSftp channel, String src, String dest) throws SftpException {
channel.put(src, dest);
}
public void uploadFile(String src, String dest) throws SftpException, JSchException {
uploadFile(channel, src, dest);
}
public void connect() throws JSchException, SftpException {
JSch jSch = new JSch();
session = jSch.getSession(username, remoteHost, remotePort);
session.setPassword(password);
session.setConfig("PreferredAuthentications", "password");
session.setConfig("StrictHostKeyChecking", "no");// 为session重新设置参数
session.connect();
channel = (ChannelSftp) session.openChannel("sftp");
channel.connect(5000);
if (!"UTF-8".equalsIgnoreCase(charset)) {
Class cl = ChannelSftp.class;
Field f;
try {
f = cl.getDeclaredField("server_version");
f.setAccessible(true);
f.set(channel, 2);
} catch (Exception e) {
e.printStackTrace();
}
}
channel.setFilenameEncoding(charset);
}
public void disconnectAll() {
if (channel != null && channel.isConnected()) {
channel.disconnect();
}
if (session != null && session.isConnected()) {
session.disconnect();
}
}
}
测试程序:
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import java.io.File;
import java.util.Collection;
/**
* @author star
* @date 2022/2/9 4:22 PM
*/
//1458个文件 每个500kb 耗时68807
//1458个文件 每个500kb 耗时68676
//1458个文件 每个500kb 耗时68714
public class Test1 {
public static void main(String[] args) throws JSchException, SftpException {
Collection<File> files = FileUtils.listFiles(new File("/tmp/test"), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
long start = System.currentTimeMillis();
SftpUtil2 sftpUtil = new SftpUtil2("192.168.40.37", 22, "root", "R0ck9","UTF-8");
for (File file : files) {
sftpUtil.uploadFile(file.getPath(),"/tmp/aa");
}
System.out.println("上传完毕,耗时:" + (System.currentTimeMillis() - start));
sftpUtil.disconnectAll();
}
}
多线程错误示例
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import java.io.File;
import java.util.Collection;
/**
* @author star
* @date 2022/2/9 4:22 PM
*/
public class Test2 {
public static void main(String[] args) throws JSchException, SftpException {
Collection<File> files = FileUtils.listFiles(new File("/tmp/test"), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
SftpUtil2 sftpUtil = new SftpUtil2("192.168.40.37", 22, "root", "R0ck9","UTF-8");
files.parallelStream().forEach(file -> {
try {
sftpUtil.uploadFile(file.getPath(),"/tmp/aa");
} catch (SftpException | JSchException e) {
e.printStackTrace();
}
});
sftpUtil.disconnectAll();
}
}
该程序使用多线程并发操作sftp,在多线程中共用一个ChannelSftp
,就会出异常,具体如下:
Caused by: java.io.IOException: Pipe closed
at java.io.PipedInputStream.read(PipedInputStream.java:307)
at java.io.PipedInputStream.read(PipedInputStream.java:377)
at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2909)
at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:583)
... 15 more
4: java.io.IOException: Pipe closed
at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:697)
at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:475)
at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:365)
at SftpUtil2.uploadFile(SftpUtil2.java:33)
at SftpUtil2.uploadFile(SftpUtil2.java:37)
at Test2.lambda$main$0(Test2.java:19)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.io.IOException: Pipe closed
at java.io.PipedInputStream.read(PipedInputStream.java:307)
at java.io.PipedInputStream.read(PipedInputStream.java:377)
at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2909)
at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:583)
... 14 more
4: java.io.IOException: Pipe closed
at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:697)
at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:475)
at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:365)
at SftpUtil2.uploadFile(SftpUtil2.java:33)
at SftpUtil2.uploadFile(SftpUtil2.java:37)
at Test2.lambda$main$0(Test2.java:19)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.io.IOException: Pipe closed
at java.io.PipedInputStream.read(PipedInputStream.java:307)
at java.io.PipedInputStream.read(PipedInputStream.java:377)
at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2909)
at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:583)
... 14 more
多线程正确示例
在SftpUtil
中使用ThreadLocal包装ChannelSftp、Session保证线程安全。
在Test3
中使用parallelStream().forEach
完成多线程并发操作sftp
。
SftpUtil
import com.jcraft.jsch.*;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Field;
/**
* @author star
* @descripton sftp工具类,线程安全的
* @date 2021/6/10
**/
@Slf4j
public class SftpUtil {
ThreadLocal<ChannelSftp> channelThreadLocal = new ThreadLocal<>();
ThreadLocal<Session> sessionThreadLocal = new ThreadLocal<>();
public String username;
public String password;
public String remoteHost;
public Integer remotePort;
public String charset;
public SftpUtil(String remoteHost, Integer remotePort, String username, String password, String charset) throws JSchException, SftpException {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.username = username;
this.password = password;
this.charset = charset;
connect();
}
public void uploadFile(ChannelSftp channel, String src, String dest) throws SftpException {
channel.put(src, dest);
}
public void uploadFile(String src, String dest) throws SftpException, JSchException {
uploadFile(getChannel(), src, dest);
}
public void connect() throws JSchException, SftpException {
JSch jSch = new JSch();
Session session = jSch.getSession(username, remoteHost, remotePort);
session.setPassword(password);
session.setConfig("PreferredAuthentications","password");
session.setConfig("StrictHostKeyChecking", "no");// 为session重新设置参数
session.connect();
ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");
channel.connect(5000);
if (!"UTF-8".equalsIgnoreCase(charset)) {
Class cl = ChannelSftp.class;
Field f;
try {
f = cl.getDeclaredField("server_version");
f.setAccessible(true);
f.set(channel, 2);
} catch (Exception e) {
e.printStackTrace();
}
}
channel.setFilenameEncoding(charset);
channelThreadLocal.set(channel);
sessionThreadLocal.set(session);
}
public void disconnectAll() throws JSchException, SftpException {
ChannelSftp channel = getChannel();
if (channel != null && channel.isConnected()) {
channel.disconnect();
}
Session session = getSession();
if (session != null && session.isConnected()) {
session.disconnect();
}
}
public ChannelSftp getChannel() throws JSchException, SftpException {
ChannelSftp channelSftp = channelThreadLocal.get();
if (channelSftp == null) {
connect();
}
return channelThreadLocal.get();
}
public Session getSession() throws JSchException, SftpException {
Session session = sessionThreadLocal.get();
if (session == null) {
connect();
}
return sessionThreadLocal.get();
}
}
Test3
:
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import java.io.File;
import java.util.Collection;
/**
* @author star
* @date 2022/2/9 4:22 PM
*/
//1458个文件 每个500kb 耗时67506
//1458个文件 每个500kb 耗时66652
//1458个文件 每个500kb 耗时67193
public class Test3 {
public static ThreadLocal<SftpUtil> sftpUtilThreadLocal = new ThreadLocal<>();
public static void main(String[] args) throws JSchException, SftpException {
Collection<File> files = FileUtils.listFiles(new File("/tmp/test"), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
long start = System.currentTimeMillis();
SftpUtil sftpUtil = new SftpUtil("192.168.40.37", 22, "root", "R0ck9", "UTF-8");
files.parallelStream().forEach(file -> {
try {
sftpUtil.uploadFile(file.getPath(), "/tmp/aa");
} catch (SftpException | JSchException e) {
e.printStackTrace();
}
});
System.out.println("上传完毕,耗时:" + (System.currentTimeMillis() - start));
sftpUtil.disconnectAll();
}
}
性能测试
使用单线程上传1458
个文件,每个500kb
,进行三次测试,平均耗时为:68732
毫秒
使用多线程上传1458
个文件,每个500kb
,进行三次测试,平均耗时为:67117
毫秒
最佳实践
通过测试,可以看到使用多线程操作确实能提升效率,但是,其提升非常有限,却带来了较高的复杂性,在使用中更加建议通过单线程的方式。如果,需要将sftp封装成工具类供他人使用,需要提醒多线程并发的问题,或为了保险起见,也可以在sftp工具类中使用ThreadLocal
包装ChannelSftp
和Session
。