jsch的sftp在多线程下的问题及处理办法

jsch的sftp在多线程下的问题及处理办法

作者 时间
雨中星辰 2022-02-09

jsch的sftp(ChannelSftp、Session)是不能在多线程下进行公用的,如果希望在多线程下操作sftp,那么ChannelSftp、Session需要放在ThreadLocal中。

单线程示例

SftpUtil2:

import com.jcraft.jsch.*;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Field;


/**
 * @author star
 * @descripton sftp工具类(非线程安全的)
 * @date 2021/6/10
 **/
@Slf4j
public class SftpUtil2 {
    Session session;
    ChannelSftp channel;

    public String username;
    public String password;
    public String remoteHost;
    public Integer remotePort;
    public String charset;

    public SftpUtil2(String remoteHost, Integer remotePort, String username, String password, String charset) throws JSchException, SftpException {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.username = username;
        this.password = password;
        this.charset = charset;
        connect();
    }

    public void uploadFile(ChannelSftp channel, String src, String dest) throws SftpException {
        channel.put(src, dest);
    }

    public void uploadFile(String src, String dest) throws SftpException, JSchException {
        uploadFile(channel, src, dest);
    }

    public void connect() throws JSchException, SftpException {
        JSch jSch = new JSch();
        session = jSch.getSession(username, remoteHost, remotePort);
        session.setPassword(password);
        session.setConfig("PreferredAuthentications", "password");
        session.setConfig("StrictHostKeyChecking", "no");// 为session重新设置参数
        session.connect();
        channel = (ChannelSftp) session.openChannel("sftp");
        channel.connect(5000);
        if (!"UTF-8".equalsIgnoreCase(charset)) {
            Class cl = ChannelSftp.class;
            Field f;
            try {
                f = cl.getDeclaredField("server_version");
                f.setAccessible(true);
                f.set(channel, 2);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        channel.setFilenameEncoding(charset);
    }

    public void disconnectAll() {
        if (channel != null && channel.isConnected()) {
            channel.disconnect();
        }
        if (session != null && session.isConnected()) {
            session.disconnect();
        }
    }

}

测试程序:

import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;

import java.io.File;
import java.util.Collection;

/**
 * @author star
 * @date 2022/2/9 4:22 PM
 */
//1458个文件 每个500kb 耗时68807
//1458个文件 每个500kb 耗时68676
//1458个文件 每个500kb 耗时68714
public class Test1 {
    public static void main(String[] args) throws JSchException, SftpException {
        Collection<File> files = FileUtils.listFiles(new File("/tmp/test"), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
        long start = System.currentTimeMillis();
        SftpUtil2 sftpUtil = new SftpUtil2("192.168.40.37", 22, "root", "R0ck9","UTF-8");
        for (File file : files) {
            sftpUtil.uploadFile(file.getPath(),"/tmp/aa");
        }
        System.out.println("上传完毕,耗时:" + (System.currentTimeMillis() - start));
        sftpUtil.disconnectAll();
    }
}

多线程错误示例

import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;

import java.io.File;
import java.util.Collection;

/**
 * @author star
 * @date 2022/2/9 4:22 PM
 */
public class Test2 {
    public static void main(String[] args) throws JSchException, SftpException {
        Collection<File> files = FileUtils.listFiles(new File("/tmp/test"), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
        SftpUtil2 sftpUtil = new SftpUtil2("192.168.40.37", 22, "root", "R0ck9","UTF-8");
        files.parallelStream().forEach(file -> {
            try {
                sftpUtil.uploadFile(file.getPath(),"/tmp/aa");
            } catch (SftpException | JSchException e) {
                e.printStackTrace();
            }
        });

        sftpUtil.disconnectAll();
    }
}

该程序使用多线程并发操作sftp,在多线程中共用一个ChannelSftp,就会出异常,具体如下:

Caused by: java.io.IOException: Pipe closed
    at java.io.PipedInputStream.read(PipedInputStream.java:307)
    at java.io.PipedInputStream.read(PipedInputStream.java:377)
    at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2909)
    at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
    at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:583)
    ... 15 more
4: java.io.IOException: Pipe closed
    at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:697)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:475)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:365)
    at SftpUtil2.uploadFile(SftpUtil2.java:33)
    at SftpUtil2.uploadFile(SftpUtil2.java:37)
    at Test2.lambda$main$0(Test2.java:19)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.io.IOException: Pipe closed
    at java.io.PipedInputStream.read(PipedInputStream.java:307)
    at java.io.PipedInputStream.read(PipedInputStream.java:377)
    at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2909)
    at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
    at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:583)
    ... 14 more
4: java.io.IOException: Pipe closed
    at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:697)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:475)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:365)
    at SftpUtil2.uploadFile(SftpUtil2.java:33)
    at SftpUtil2.uploadFile(SftpUtil2.java:37)
    at Test2.lambda$main$0(Test2.java:19)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.io.IOException: Pipe closed
    at java.io.PipedInputStream.read(PipedInputStream.java:307)
    at java.io.PipedInputStream.read(PipedInputStream.java:377)
    at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2909)
    at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
    at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:583)
    ... 14 more

多线程正确示例

SftpUtil中使用ThreadLocal包装ChannelSftp、Session保证线程安全。

Test3中使用parallelStream().forEach完成多线程并发操作sftp

SftpUtil

import com.jcraft.jsch.*;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Field;


/**
 * @author star
 * @descripton sftp工具类,线程安全的
 * @date 2021/6/10
 **/
@Slf4j
public class SftpUtil {

    ThreadLocal<ChannelSftp> channelThreadLocal = new ThreadLocal<>();
    ThreadLocal<Session> sessionThreadLocal = new ThreadLocal<>();

    public String username;
    public String password;
    public String remoteHost;
    public Integer remotePort;
    public String charset;

    public SftpUtil(String remoteHost, Integer remotePort, String username, String password, String charset) throws JSchException, SftpException {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.username = username;
        this.password = password;
        this.charset = charset;
        connect();
    }

    public void uploadFile(ChannelSftp channel, String src, String dest) throws SftpException {
        channel.put(src, dest);
    }

    public void uploadFile(String src, String dest) throws SftpException, JSchException {
        uploadFile(getChannel(), src, dest);
    }

    public void connect() throws JSchException, SftpException {
        JSch jSch = new JSch();
        Session session = jSch.getSession(username, remoteHost, remotePort);
        session.setPassword(password);
        session.setConfig("PreferredAuthentications","password");
        session.setConfig("StrictHostKeyChecking", "no");// 为session重新设置参数
        session.connect();
        ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");
        channel.connect(5000);
        if (!"UTF-8".equalsIgnoreCase(charset)) {
            Class cl = ChannelSftp.class;
            Field f;
            try {
                f = cl.getDeclaredField("server_version");
                f.setAccessible(true);
                f.set(channel, 2);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        channel.setFilenameEncoding(charset);
        channelThreadLocal.set(channel);
        sessionThreadLocal.set(session);
    }

    public void disconnectAll() throws JSchException, SftpException {
        ChannelSftp channel = getChannel();
        if (channel != null && channel.isConnected()) {
            channel.disconnect();
        }
        Session session = getSession();
        if (session != null && session.isConnected()) {
            session.disconnect();
        }
    }

    public ChannelSftp getChannel() throws JSchException, SftpException {
        ChannelSftp channelSftp = channelThreadLocal.get();
        if (channelSftp == null) {
            connect();
        }
        return channelThreadLocal.get();
    }

    public Session getSession() throws JSchException, SftpException {
        Session session = sessionThreadLocal.get();
        if (session == null) {
            connect();
        }
        return sessionThreadLocal.get();
    }
}

Test3:

import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;

import java.io.File;
import java.util.Collection;

/**
 * @author star
 * @date 2022/2/9 4:22 PM
 */
//1458个文件 每个500kb 耗时67506
//1458个文件 每个500kb 耗时66652
//1458个文件 每个500kb 耗时67193

public class Test3 {
    public static ThreadLocal<SftpUtil> sftpUtilThreadLocal = new ThreadLocal<>();

    public static void main(String[] args) throws JSchException, SftpException {
        Collection<File> files = FileUtils.listFiles(new File("/tmp/test"), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
        long start = System.currentTimeMillis();
        SftpUtil sftpUtil = new SftpUtil("192.168.40.37", 22, "root", "R0ck9", "UTF-8");
        files.parallelStream().forEach(file -> {
            try {
                sftpUtil.uploadFile(file.getPath(), "/tmp/aa");
            } catch (SftpException | JSchException e) {
                e.printStackTrace();
            }
        });

        System.out.println("上传完毕,耗时:" + (System.currentTimeMillis() - start));
        sftpUtil.disconnectAll();
    }
}

性能测试

使用单线程上传1458个文件,每个500kb,进行三次测试,平均耗时为:68732毫秒

使用多线程上传1458个文件,每个500kb,进行三次测试,平均耗时为:67117毫秒

最佳实践

通过测试,可以看到使用多线程操作确实能提升效率,但是,其提升非常有限,却带来了较高的复杂性,在使用中更加建议通过单线程的方式。如果,需要将sftp封装成工具类供他人使用,需要提醒多线程并发的问题,或为了保险起见,也可以在sftp工具类中使用ThreadLocal包装ChannelSftpSession

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容