概述
异步包括异步请求和异步调用。
异步请求与异步调用的区别
两者的使用场景不同,异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;而异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,如同步日志到kafka中做日志分析等。
异步请求是会一直等待response响应,需要返回结果给客户端;而异步调用往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。
异步请求
异步请求与同步请求
特点:
可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放容器所分配线程的请求,其响应将被延后,可以在耗时处理完成后再响应客户端。
一句话:增加服务器对客户端请求的吞吐量(实际生产环境里,并发请求量很大时,会通过nginx把请求负载到集群服务的各个节点上来分摊请求压力,当然还可以通过消息队列来做请求的缓冲)。
实现
- Servlet方式实现异步请求
@Slf4j
@RequestMapping(value = "/email/servletReq", method = GET)
public void servletReq (HttpServletRequest request, HttpServletResponse response) {
AsyncContext asyncContext = request.startAsync();
// 设置监听器:可设置其开始、完成、异常、超时等事件的回调处理
asyncContext.addListener(new AsyncListener() {
@Override
public void onTimeout(AsyncEvent event) throws IOException {
// 超时后的相关操作...
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
log.info("线程开始");
}
@Override
public void onError(AsyncEvent event) throws IOException {
}
@Override
public void onComplete(AsyncEvent event) throws IOException {
log.info("执行完成");
// 清理资源的操作...
}
});
// 设置超时时间
asyncContext.setTimeout(20000);
asyncContext.start(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
log.info("内部线程:" + Thread.currentThread().getName());
asyncContext.getResponse().setCharacterEncoding("utf-8");
asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
asyncContext.getResponse().getWriter().println("这是异步的请求返回");
} catch (Exception e) {
}
// 异步请求完成通知, 此时整个请求才完成
asyncContext.complete();
}
});
// 此时之类request的线程连接已经释放
log.info("主线程:" + Thread.currentThread().getName());
}
- 直接返回的参数包裹一层callable即可,可以继承WebMvcConfigurerAdapter类来设置默认线程池和超时处理
@RequestMapping(value = "/email/callableReq", method = GET)
@ResponseBody
public Callable<String> callableReq () {
log.info("外部线程:" + Thread.currentThread().getName());
return new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(10000);
log.info("内部线程:" + Thread.currentThread().getName());
return "callable!";
}
};
}
@Configuration
public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter {
@Resource
private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
@Override
public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
// 处理callable超时
configurer.setDefaultTimeout(60*1000);
configurer.setTaskExecutor(myThreadPoolTaskExecutor);
configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
}
@Bean
public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
return new TimeoutCallableProcessingInterceptor();
}
}
- 和方式2差不多,在Callable外包一层,给WebAsyncTask设置一个超时回调,即可实现超时处理
@RequestMapping(value = "/email/webAsyncReq", method = GET)
@ResponseBody
public WebAsyncTask<String> webAsyncReq () {
log.info("外部线程:" + Thread.currentThread().getName());
Callable<String> result = () -> {
log.info("内部线程开始:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(4);
} catch (Exception e) {
}
log.info("副线程返回");
log.info("内部线程返回:" + Thread.currentThread().getName());
return "success";
};
WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result);
wat.onTimeout(new Callable<String>() {
@Override
public String call() throws Exception {
return "超时";
}
});
return wat;
}
- DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。
@RequestMapping(value = "/email/deferredResultReq", method = GET)
@ResponseBody
public DeferredResult<String> deferredResultReq () {
log.info("外部线程:" + Thread.currentThread().getName());
// 设置超时时间
DeferredResult<String> result = new DeferredResult<String>(60*1000L);
// 处理超时事件 采用委托机制
result.onTimeout(new Runnable() {
@Override
public void run() {
log.info("DeferredResult超时");
result.setResult("超时!");
}
});
result.onCompletion(new Runnable() {
@Override
public void run() {
log.info("调用完成");
}
});
myThreadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
// 处理业务逻辑
log.info("内部线程:" + Thread.currentThread().getName());
// 返回结果
result.setResult("DeferredResult!");
}
});
return result;
}
异步调用
开启异步支持
@Configuration
@EnableAsync
public class SpringAsyncConfig {
}
@EnableAsync检测Spring的@Async注释和EJB 3.1 javax. EJB异步,还可用于检测其他用户定义注解。
自定义线程池:
@Slf4j
@Configuration
public class ThreadPoolConfiguration {
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10000),
new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
(r, executor) -> log.error("system pool is full! "));
}
}
在异步处理的方法上添加注解 @Async ,当对 execute 方法 调用时,通过自定义的线程池 defaultThreadPoolExecutor 异步化执行 execute 方法
@Service
public class AsyncServiceImpl implements AsyncService {
@Async("defaultThreadPoolExecutor")
public Boolean execute(Integer num) {
log.info("线程:" + Thread.currentThread().getName() + " , 任务:" + num);
return true;
}
}
用 @Async 注解标记的方法,称为异步方法。在SB应用中使用 @Async 很简单:
- 调用异步方法类上或启动类加上注解 @EnableAsync
- 在需要被异步调用的方法外加上 @Async
- 所使用的 @Async 注解方法的类对象应该是Spring容器管理的bean对象;
@Async使用
- 无返回值
@Async
@Slf4j
public void returnVoid() {
}
- 有返回值
@Async
@Slf4j
public Future<String> returnFuture() {
try {
Thread.sleep(1000);
return new AsyncResult<String>("hello");
} catch (InterruptedException e) {
}
return null;
}
执行器
Spring默认使用SimpleAsyncTaskExecutor线程池去执行这些异步方法,此执行器没有限制线程数,实际上此线程池不是真正意义上的线程池,线程并没有重用,每次调用都会创建一个新的线程。可从两个层级进行覆盖:
- 方法级别覆盖
@Async("threadPoolTaskExecutor")
public void asyncMethodWithConfiguredExecutor() {
}
- 应用级别覆盖
自定义配置类实现AsyncConfigurer接口,重写getAsyncExecutor()方法:
@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
}
异常处理
当方法返回值是Future时,异常捕获是没问题的,Future.get()
方法会抛出异常。但如果返回类型是Void,异常在当前线程就捕获不到,需要添加额外的配置来处理异常。
实现AsyncUncaughtExceptionHandler接口来自定义异常处理类,重写handleUncaughtException()方法,存在任何未捕获的异步异常时调用:
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException (Throwable throwable, Method method, Object... obj) {
log.info("Exception message - " + throwable.getMessage() + "Method name - " + method.getName());
for (Object param : obj) {
log.info("Parameter value - " + param);
}
}
}
由configuration类实现的AsyncConfigurer接口。作为其中的一部分,还需要覆盖getAsyncUncaughtExceptionHandler()方法来返回自定义的异步异常处理程序:
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
失效
调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为Spring在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。其他注解如@Cache等也是如此,由于Spring的代理机制。在开发中最好把异步服务单独抽出一个类来管理。
导致@Async异步方法失效的几种情况:
- 调用同一个类下注有@Async异步方法:在Spring中像@Async,@Transactional,@Cache等注解本质使用的是动态代理,Spring容器在初始化时,会将含有AOP注解的类对象替换为代理对象。注解失效的原因,就是因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器,解决方法也会沿着这个思路来解决。
- 调用static方法
- 调用private方法
解决方法
上面的情况2,3很好解决,仅考虑情况1。
- 将要异步执行的方法单独抽取成一个类,原理就是当你把执行异步的方法单独抽取成一个类的时候,这个类肯定是被Spring管理的,其他Spring组件需要调用时肯定会注入进去,这时候实际上注入进去的就是代理类。
其实注入对象都是从Spring容器中给当前Spring组件进行成员变量的赋值,由于某些类使用AOP注解,那么实际上在Spring容器中实际存在的是它的代理对象。那么就可以通过上下文获取自己的代理对象调用异步方法。
@Controller
public class EmailController {
@Autowired
private ApplicationContext applicationContext;
@RequestMapping(value = "/asyncCall", method = GET)
@ResponseBody
public void asyncCall () {
try {
// 调用同类下的异步方法是不起作用的
// this.testAsyncTask();
// 通过上下文获取自己的代理对象调用异步方法
EmailController controller = (EmailController)applicationContext.getBean(EmailController.class);
controller.testAsyncTask();
} catch (Exception e) {
}
}
@Async
public void testAsyncTask() throws InterruptedException {
Thread.sleep(10000);
log.info("异步任务执行完成!");
}
}
- 开启cglib代理,手动获取Spring代理类,从而调用同类下的异步方法。在启动类上加上
@EnableAspectJAutoProxy(exposeProxy = true)
注解:
@Service
@Transactional(value = "transactionManager", readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
public class EmailService {
@Autowired
private ApplicationContext applicationContext;
@Async
public void testSyncTask() throws InterruptedException {
Thread.sleep(10000);
log.info("异步任务执行完成!");
}
public void asyncCallTwo() throws InterruptedException {
//this.testSyncTask();
// EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
// emailService.testSyncTask();
boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理对象;
boolean isCglib = AopUtils.isCglibProxy(EmailController.class); //是否是CGLIB方式的代理对象;
boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class); //是否是JDK动态代理方式的代理对象;
EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
EmailService proxy = (EmailService) AopContext.currentProxy();
log.info(emailService == proxy ? true : false);
proxy.testSyncTask();
}
}
参考
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/142195.html