这样做,就可以修改spring中任务的调度时间了

紧接上一篇文章,我们一下来思考一下,下面两个问题:

1.Spring中定时任务是谁来执行的?

2.如何动态调整调度策略。

接下来我们详细分析一下这两个问题。

为@Scheduled定义一个线程池

spring在执行调度任务前,会按照好一定的策略,寻找一个可用的线程池来执行调度任务,寻找这个线程池的过程如下:

private void finishRegistration() {
  if (this.scheduler != null) {
   this.registrar.setScheduler(this.scheduler);
  }

  if (this.beanFactory instanceof ListableBeanFactory) {
   Map<String, SchedulingConfigurer> beans =
     ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
   List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
   AnnotationAwareOrderComparator.sort(configurers);
   // 1.使用SchedulingConfigurer进行配置
   for (SchedulingConfigurer configurer : configurers) {
    configurer.configureTasks(this.registrar);
   }
  }

  if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
   Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
   try {
    // Search for TaskScheduler bean... // 2.寻找一个类型为 TaskScheduler 的bean
    this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
   }
   catch (NoUniqueBeanDefinitionException ex) {
    if (logger.isTraceEnabled()) {
     logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
       ex.getMessage());
    }
    try {
     this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
    }
    catch (NoSuchBeanDefinitionException ex2) {
     if (logger.isInfoEnabled()) {
      logger.info("More than one TaskScheduler bean exists within the context, and " +
        "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
        "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
        "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
        ex.getBeanNamesFound());
     }
    }
   }
   catch (NoSuchBeanDefinitionException ex) {
    if (logger.isTraceEnabled()) {
     logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
       ex.getMessage());
    }
    // Search for ScheduledExecutorService bean next...
    try {
     this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
    }
    catch (NoUniqueBeanDefinitionException ex2) {
     if (logger.isTraceEnabled()) {
      logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
        ex2.getMessage());
     }
     try {
      this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
     }
     catch (NoSuchBeanDefinitionException ex3) {
      if (logger.isInfoEnabled()) {
       logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
         "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
         "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
         "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
         ex2.getBeanNamesFound());
      }
     }
    }
    catch (NoSuchBeanDefinitionException ex2) {
     if (logger.isTraceEnabled()) {
      logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
        ex2.getMessage());
     }
     // Giving up -> falling back to default scheduler within the registrar...
     logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
    }
   }
  }
  // 3.在afterPropertiesSet中创建一个默认的线程池
  this.registrar.afterPropertiesSet();
 }

在上面的代码中,有三处可以完成对 线程池的设置:

1.通过自定义的 SchedulingConfigurer 的实现类,实现对 registrar 中线程池的配置。

if (this.beanFactory instanceof ListableBeanFactory) {
 Map<String, SchedulingConfigurer> beans =
   ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
 List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
 AnnotationAwareOrderComparator.sort(configurers);
 for (SchedulingConfigurer configurer : configurers) {
  configurer.configureTasks(this.registrar);
 }
}

2.如果第一步没有找到可用的线程池,那么就会从上下文中获取一个类型为 TaskScheduler 的bean作为可用线程池.

这个也是比较坑的地方:有可能你在业务代码中定义了一个 TaskScheduler 的bean,结果被spring的调度器给使用了,这样就会产生,资源的竞争。

如果你在上下文中没有定义或者定义了多个 TaskScheduler 的话,那么 spring会抛出一个找不到bean或者bean不唯一的异常,然后输出相关的日志提醒。

if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
 Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
 try {
  // Search for TaskScheduler bean...
  this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
 }
 catch (NoUniqueBeanDefinitionException ex) {
  if (logger.isTraceEnabled()) {
   logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
     ex.getMessage());
  }
  try {
   this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
  }
  catch (NoSuchBeanDefinitionException ex2) {
   ...
  }
 }
 catch (NoSuchBeanDefinitionException ex) {
  if (logger.isTraceEnabled()) {
   logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
     ex.getMessage());
  }
  // Search for ScheduledExecutorService bean next...
  try {
   this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
  }
  catch (NoUniqueBeanDefinitionException ex2) {
   if (logger.isTraceEnabled()) {
    logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
      ex2.getMessage());
   }
   try {
    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
   }
   catch (NoSuchBeanDefinitionException ex3) {
    ....
   }
  }
  catch (NoSuchBeanDefinitionException ex2) {
   ...
  }
 }
}

3.如果第二步也没有找到可用线程池的话,就会创建一个默认的线程池。

this.registrar.afterPropertiesSet();

 public void afterPropertiesSet() {
  scheduleTasks();
 }

 protected void scheduleTasks() {
  if (this.taskScheduler == null) {
  // 创建一个线程池
   this.localExecutor = Executors.newSingleThreadScheduledExecutor();
   this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
  }
  if (this.triggerTasks != null) {
   for (TriggerTask task : this.triggerTasks) {
    addScheduledTask(scheduleTriggerTask(task));
   }
  }
  if (this.cronTasks != null) {
   for (CronTask task : this.cronTasks) {
    addScheduledTask(scheduleCronTask(task));
   }
  }
  if (this.fixedRateTasks != null) {
   for (IntervalTask task : this.fixedRateTasks) {
    addScheduledTask(scheduleFixedRateTask(task));
   }
  }
  if (this.fixedDelayTasks != null) {
   for (IntervalTask task : this.fixedDelayTasks) {
    addScheduledTask(scheduleFixedDelayTask(task));
   }
  }
 }

默认会创建一个 newSingleThreadScheduledExecutor 作为默认的线程池。这个线程池的 最大线程池个数是 Interger 的最大值,队里大小也是 Integer 的最大值。这种线程池在业务开发中是不推荐,会导致系统资源被过多占用。

那么如何给spring提供一个自定义的线程池呢?按照上面spring获取线程池的流程,我们可以在第一步的时候,定义一个 SchedulingConfigurer  来完成对 register的配置,具体代码如下:

 @Configuration
public class SchedulerConfig implements SchedulingConfigurer {
 
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) 
{
        taskRegistrar.setTaskScheduler(threadPoolTaskScheduler());
    }
 
    @Bean(destroyMethod="shutdown")
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10); // 设置线程池大小
        scheduler.setThreadNamePrefix("my-scheduled-task-"); // 设置线程名前缀
        scheduler.initialize();
        return scheduler;
    }
}

实现可动态修改的调度任务

使用spring的@Scheduled实现的定时任务可以动态修改调度策略吗?

其实是不可以的

看过上面的执行流程,可以发现,定时调度策略、定时执行的业务逻辑,是在spring容器启动的时候就设置好的,而且spring并没有提供相关的接口,让我们来修改。

所以要想动态修改 定时调度的策略,那么就只能自己实现一套简单的小框架。

当然不是直接使用jdk中原生的api,而是使用spring提供的现有工具,为了对现有使用 @Scheduled 方式产生最小影响,以下实现方案也是采用注解的方式来实现,具体实现流程分为4步。

1.自定义注解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DynamicScheduled {
    String taskId()// 执行任务的唯一标识符

    String cron();  // 调度的cron表示,这里暂时只支持一种
}

2.解析自定义注解

@Component
public class DynamicScheduler {

    private ThreadPoolTaskScheduler taskScheduler ;

    @Autowired
    private ApplicationContext applicationContext;

    private Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();


    @Autowired
    public DynamicScheduler(ApplicationContext applicationContext) 
{
        this.applicationContext = applicationContext;
        this.taskScheduler = createTaskScheduler();
    }

// 自定义任务执行线程池
    @Bean
    public ThreadPoolTaskScheduler createTaskScheduler() 
{
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setThreadNamePrefix("DynamicTaskScheduler-");
        taskScheduler.initialize();
        return taskScheduler;
    }


// 监听上下文刷新事件
    @EventListener(ContextRefreshedEvent.class)
    public void init() {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
        for (Object bean : beans.values()) {
            Method[] methods = bean.getClass().getDeclaredMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(DynamicScheduled.class)) {
                    DynamicScheduled dynamicScheduled = method.getAnnotation(DynamicScheduled.class);
                    String taskId = dynamicScheduled.taskId();
                    String cron = dynamicScheduled.cron();
                    Runnable task = () -> {
                        try {
                            method.invoke(bean);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    };
                    ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(task, new CronTrigger(cron));
                    scheduledTasks.put(taskId, scheduledFuture);
                }
            }
        }
    }

// 更新调度策略
    public void updateCron(String taskId, String newCron) {
        ScheduledFuture<?> scheduledFuture = scheduledTasks.get(taskId);
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            scheduledTasks.remove(taskId);
        }

        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
        for (Object bean : beans.values()) {
            Method[] methods = bean.getClass().getDeclaredMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(DynamicScheduled.class)) {
                    DynamicScheduled dynamicScheduled = method.getAnnotation(DynamicScheduled.class);
                    if (taskId.equals(dynamicScheduled.taskId())) {
                        Runnable task = () -> {
                            try {
                                method.invoke(bean);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        };
                        ScheduledFuture<?> newScheduledFuture = taskScheduler.schedule(task, new CronTrigger(newCron));
                        scheduledTasks.put(taskId, newScheduledFuture);
                        return;
                    }
                }
            }
        }
    }
}

3.修改调度策略

@PostMapping("/update-cron")
public String updateCron(@RequestParam("taskId") String taskId, @RequestParam("cron") String cronExpression) {
    dynamicScheduler.updateCron(taskId, cronExpression);
    return "Cron expression for task " + taskId + " updated to: " + cronExpression;
}

4.使用自定义的注解

//  @Scheduled(cron="0/1 * * * * ?")
    @DynamicScheduled(taskId = "retryFailJob",cron = "0/1 * * * * ?"// 每个一秒
    public void schedule(){
        System.out.println("scheduler" + new SimpleDateFormat("YYYY-MM-dd HH-mm:ss").format(new Date()) + "  :  "+Thread.currentThread().getName());
    }

项目启动以后,可以通过 /update-cron 接口在不停服的情况下更新任务的调度时间,如果有需要的小伙伴可以直接复制代码直接使用

今日分享如果对你有帮助,帮忙点个在看

原文始发于微信公众号(小李哥编程):这样做,就可以修改spring中任务的调度时间了

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/253873.html

(0)
Java朝阳的头像Java朝阳

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!