Spring异步请求与异步调用

异步请求

在Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由某一个线程从头到尾负责处理。如果一个请求需要进行IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待****IO操作完成, 而IO操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,在并发量越来越大的情况下,这将带来严重的性能问题。其请求流程大致为:


image.png

而在Servlet3.0发布后,提供了一个新特性:异步处理请求。可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放了容器所分配线程的请求,其响应将被延后,可以在耗时处理完成(例如长时间的运算)时再对客户端进行响应。其请求流程为:


image.png

在Servlet 3.0后,我们可以从HttpServletRequest对象中获得一个AsyncContext对象,该对象构成了异步处理的上下文,Request和Response对象都可从中获取。AsyncContext可以从当前线程传给另外的线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。如此,通过将请求从一个线程传给另一个线程处理的过程便构成了Servlet 3.0中的异步处理。

随着Spring5发布,提供了一个响应式Web框架:Spring WebFlux。之后可能就不需要Servlet容器的支持了。以下是其先后对比图:


image.png

左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,从上到下依次是Router Functions,WebFlux,Reactive Streams三个新组件。

原生异步请求API说明
在编写实际代码之前,我们来了解下一些关于异步请求的api的调用说明。

获取AsyncContext:根据HttpServletRequest对象获取。
AsyncContext asyncContext = request.startAsync();

设置监听器:可设置其开始、完成、异常、超时等事件的回调处理
其监听器的接口代码:

public interface AsyncListener extends EventListener {
    void onComplete(AsyncEvent event) throws IOException;
    void onTimeout(AsyncEvent event) throws IOException;
    void onError(AsyncEvent event) throws IOException;
    void onStartAsync(AsyncEvent event) throws IOException;
}

说明:

  • onStartAsync:异步线程开始时调用
  • onError:异步线程出错时调用
  • onTimeout:异步线程执行超时调用
  • onComplete:异步执行完毕时调用

一般上,我们在超时或者异常时,会返回给前端相应的提示,比如说超时了,请再次请求等等,根据各业务进行自定义返回。同时,在异步调用完成时,一般需要执行一些清理工作或者其他相关操作。

需要注意的是只有在调用request.startAsync前将监听器添加到AsyncContext,监听器的onStartAsync方法才会起作用,而调用startAsync前AsyncContext还不存在,所以第一次调用startAsync是不会被监听器中的onStartAsync方法捕获的,只有在超时后又重新开始的情况下onStartAsync方法才会起作用。

设置超时:通过setTimeout方法设置,单位:毫秒。
一定要设置超时时间,不能无限等待下去,不然和正常的请求就一样了。。

Servlet方式实现异步请求

前面已经提到,可通过HttpServletRequest对象中获得一个AsyncContext对象,该对象构成了异步处理的上下文。所以,我们来实际操作下。

1、编写一个简单控制层

/**
 * 使用servlet方式进行异步请求
 *
 */
@Slf4j
@RestController
public class ServletController {
    
    @RequestMapping("/servlet/orig")
    public void todo(HttpServletRequest request,HttpServletResponse response) throws Exception {
        //这里来个休眠
        Thread.sleep(100);
        response.getWriter().println("这是【正常】的请求返回");
    }
    
    @RequestMapping("/servlet/async")
    public void todoAsync(HttpServletRequest request,HttpServletResponse response) {
        AsyncContext asyncContext = request.startAsync();
        asyncContext.addListener(new AsyncListener() {
            
            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                log.info("超时了:");
                //做一些超时后的相关操作
            }
            
            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                // TODO Auto-generated method stub
                log.info("线程开始");
            }
            
            @Override
            public void onError(AsyncEvent event) throws IOException {
                log.info("发生错误:",event.getThrowable());
            }
            
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                log.info("执行完成");
                //这里可以做一些清理资源的操作
                
            }
        });
        //设置超时时间
        asyncContext.setTimeout(200);
        //也可以不使用start 进行异步调用
//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//                编写业务逻辑
//                
//            }
//        }).start();
        
        asyncContext.start(new Runnable() {            
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                    log.info("内部线程:" + Thread.currentThread().getName());
                    asyncContext.getResponse().setCharacterEncoding("utf-8");
                    asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
                    asyncContext.getResponse().getWriter().println("这是【异步】的请求返回");
                } catch (Exception e) {
                    log.error("异常:",e);
                }
                //异步请求完成通知
                //此时整个请求才完成
                //其实可以利用此特性 进行多条消息的推送 把连接挂起。。
                asyncContext.complete();
            }
        });
        //此时之类 request的线程连接已经释放了
        log.info("线程:" + Thread.currentThread().getName());
    }
}

注意:异步请求时,可以利用ThreadPoolExecutor自定义个线程池。

1.启动下应用,查看控制台输出就可以获悉是否在同一个线程里面了。同时,可设置下等待时间,之后就会调用超时回调方法了。

使用过滤器时,需要加入asyncSupported为true配置,开启异步请求支持。

@WebServlet(urlPatterns = "/okong", asyncSupported = true )  
public  class AsyncServlet extends HttpServlet ...

题外话:其实我们可以利用在未执行asyncContext.complete()方法时请求未结束这特性,可以做个简单的文件上传进度条之类的功能。但注意请求是会超时的,需要设置超时的时间下。

Spring方式实现异步请求

在Spring中,有多种方式实现异步请求,比如callable、DeferredResult或者WebAsyncTask。每个的用法略有不同,可根据不同的业务场景选择不同的方式。以下主要介绍一些常用的用法

Callable

使用很简单,直接返回的参数包裹一层callable即可。

用法

    @RequestMapping("/callable")
    public Callable<String> callable() {
        log.info("外部线程:" + Thread.currentThread().getName());
        return new Callable<String>() {
 
            @Override
            public String call() throws Exception {
                log.info("内部线程:" + Thread.currentThread().getName());
                return "callable!";
            }
        };
    }

超时、自定义线程设置

从控制台可以看见,异步响应的线程使用的是名为:MvcAsync1的线程。第一次再访问时,就是MvcAsync2了。若采用默认设置,会无限的创建新线程去处理异步请求,所以正常都需要配置一个线程池及超时时间。

编写一个配置类

@Configuration
public class JavaConfig {
    /**
     * 配置线程池
     * @return
     */
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //此方法返回可用处理器的虚拟机的最大数量; 不小于1
        int core = Runtime.getRuntime().availableProcessors();
        taskExecutor.setCorePoolSize(core);
        taskExecutor.setMaxPoolSize(core*2 + 1);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.setKeepAliveSeconds(200);
        taskExecutor.setThreadNamePrefix("callable-");//线程名称前缀
        // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

DeferredResult

相比于callable,DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。

/**
 * 线程池
 */
public static ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(30);
 
@RequestMapping("/deferredresult")
public DeferredResult<String> deferredResult(){
    log.info("外部线程:" + Thread.currentThread().getName());
    //设置超时时间
    DeferredResult<String> result = new DeferredResult<String>(60*1000L);
    //处理超时事件 采用委托机制
    result.onTimeout(new Runnable() {
 
        @Override
        public void run() {
            log.error("DeferredResult超时");
            result.setResult("超时了!");
        }
    });
    result.onCompletion(new Runnable() {
 
        @Override
        public void run() {
            //完成后
            log.info("调用完成");
        }
    });
    FIXED_THREAD_POOL.execute(new Runnable() {
 
        @Override
        public void run() {
            //处理业务逻辑
            log.info("内部线程:" + Thread.currentThread().getName());
            //返回结果
            result.setResult("DeferredResult!!");
        }
    });
    return result;
}

注意:返回结果时记得调用下setResult方法。

题外话:利用DeferredResult可实现一些长连接的功能,比如当某个操作是异步时,我们可以保存这个DeferredResult对象,当异步通知回来时,我们在找回这个DeferredResult对象,之后在setResult会结果即可。提高性能。

WebAsyncTask

使用方法都类似,只是WebAsyncTask是直接返回了。

@RequestMapping("/webAsyncTask")
    public WebAsyncTask<String> webAsyncTask() {
        log.info("外部线程:" + Thread.currentThread().getName());
        WebAsyncTask<String> result = new WebAsyncTask<String>(60*1000L, new Callable<String>() {
 
            @Override
            public String call() throws Exception {
                log.info("内部线程:" + Thread.currentThread().getName());
                return "WebAsyncTask!!!";
            }
        });
        result.onTimeout(new Callable<String>() {
            
            @Override
            public String call() throws Exception {
                // TODO Auto-generated method stub
                return "WebAsyncTask超时!!!";
            }
        });
        result.onCompletion(new Runnable() {
            
            @Override
            public void run() {
                //超时后 也会执行此方法
                log.info("WebAsyncTask执行结束");
            }
        });
        return result;
    }

异步调用

开启异步支持

@Configuration
@EnableAsync
public class SpringAsyncConfig {
}

@EnableAsync检测Spring的@Async注释和EJB 3.1 javax. EJB异步,还可用于检测其他用户定义注解。

自定义线程池:

@Slf4j
@Configuration
public class ThreadPoolConfiguration {

    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在异步处理的方法上添加注解 @Async ,当对 execute 方法 调用时,通过自定义的线程池 defaultThreadPoolExecutor 异步化执行 execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {

    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
        log.info("线程:" + Thread.currentThread().getName() + " , 任务:" + num);
        return true;
    }
}

用 @Async 注解标记的方法,称为异步方法。在SB应用中使用 @Async 很简单:

调用异步方法类上或启动类加上注解 @EnableAsync
在需要被异步调用的方法外加上 @Async
所使用的 @Async 注解方法的类对象应该是Spring容器管理的bean对象;
@Async使用

无返回值

@Async
@Slf4j
public void returnVoid() {
}

有返回值

@Async
@Slf4j
public Future<String> returnFuture() {
    try {
        Thread.sleep(1000);
        return new AsyncResult<String>("hello");
    } catch (InterruptedException e) {
    }
    return null;
}

执行器

Spring默认使用SimpleAsyncTaskExecutor线程池去执行这些异步方法,此执行器没有限制线程数,实际上此线程池不是真正意义上的线程池,线程并没有重用,每次调用都会创建一个新的线程。可从两个层级进行覆盖:

方法级别覆盖

@Async("threadPoolTaskExecutor")
public void asyncMethodWithConfiguredExecutor() {
}

应用级别覆盖
自定义配置类实现AsyncConfigurer接口,重写getAsyncExecutor()方法:

@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.initialize();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }
}

异常处理

当方法返回值是Future时,异常捕获是没问题的,Future.get()方法会抛出异常。但如果返回类型是Void,异常在当前线程就捕获不到,需要添加额外的配置来处理异常。

实现AsyncUncaughtExceptionHandler接口来自定义异常处理类,重写handleUncaughtException()方法,存在任何未捕获的异步异常时调用:

@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException (Throwable throwable, Method method, Object... obj) {
        log.info("Exception message - " + throwable.getMessage() + "Method name - " + method.getName());
        for (Object param : obj) {
            log.info("Parameter value - " + param);
        }
    }
}

由configuration类实现的AsyncConfigurer接口。作为其中的一部分,还需要覆盖getAsyncUncaughtExceptionHandler()方法来返回自定义的异步异常处理程序:

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new CustomAsyncExceptionHandler();
}

失效

调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为Spring在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。其他注解如@Cache等也是如此,由于Spring的代理机制。在开发中最好把异步服务单独抽出一个类来管理。

导致@Async异步方法失效的几种情况:

调用同一个类下注有@Async异步方法:在Spring中像@Async,@Transactional,@Cache等注解本质使用的是动态代理,Spring容器在初始化时,会将含有AOP注解的类对象替换为代理对象。注解失效的原因,就是因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器,解决方法也会沿着这个思路来解决。
调用static方法
调用private方法
解决方法
上面的情况2,3很好解决,仅考虑情况1。

将要异步执行的方法单独抽取成一个类,原理就是当你把执行异步的方法单独抽取成一个类的时候,这个类肯定是被Spring管理的,其他Spring组件需要调用时肯定会注入进去,这时候实际上注入进去的就是代理类。
其实注入对象都是从Spring容器中给当前Spring组件进行成员变量的赋值,由于某些类使用AOP注解,那么实际上在Spring容器中实际存在的是它的代理对象。那么就可以通过上下文获取自己的代理对象调用异步方法。

@Controller
public class EmailController {
    @Autowired
    private ApplicationContext applicationContext;

    @RequestMapping(value = "/asyncCall", method = GET)
    @ResponseBody
    public void asyncCall () {
        try {
            // 调用同类下的异步方法是不起作用的
            // this.testAsyncTask();
            // 通过上下文获取自己的代理对象调用异步方法
            EmailController controller = (EmailController)applicationContext.getBean(EmailController.class);
            controller.testAsyncTask();
        } catch (Exception e) {
        }
    }

    @Async
    public void testAsyncTask() throws InterruptedException {
        Thread.sleep(10000);
        log.info("异步任务执行完成!");
    }
}

开启cglib代理,手动获取Spring代理类,从而调用同类下的异步方法。在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)注解:

@Service
@Transactional(value = "transactionManager", readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
public class EmailService {
    @Autowired
    private ApplicationContext applicationContext;

    @Async
    public void testSyncTask() throws InterruptedException {
        Thread.sleep(10000);
        log.info("异步任务执行完成!");
    }

    public void asyncCallTwo() throws InterruptedException {
        //this.testSyncTask();
//        EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
//        emailService.testSyncTask();
        boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理对象;
        boolean isCglib = AopUtils.isCglibProxy(EmailController.class);  //是否是CGLIB方式的代理对象;
        boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class);  //是否是JDK动态代理方式的代理对象;
        EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
        EmailService proxy = (EmailService) AopContext.currentProxy();
        log.info(emailService == proxy ? true : false);
        proxy.testSyncTask();
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容