第六章 – 分布式定时任务框架ElasticJob之SpringBoot整合DataflowJob流式作业

导读:本篇文章讲解 第六章 – 分布式定时任务框架ElasticJob之SpringBoot整合DataflowJob流式作业,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

系列文章目录

第一章 – 分布式定时任务框架ElasticJob之JavaApi整合Simple作业
第二章 – 分布式定时任务框架ElasticJob之JavaApi整合DataflowJob作业
第三章 – 分布式定时任务框架ElasticJob之Spring框架整合SimpleJob作业
第四章 – 分布式定时任务框架ElasticJob之Spring框架整合DataflowJob作业
第五章 – 分布式定时任务框架ElasticJob之SpringBoot整合SimpleJob作业(实战一)
第五章 – 分布式定时任务框架ElasticJob之SpringBoot整合SimpleJob作业(实战二)



前言

本章节介绍SpringBoot整合DataflowJob流式作业,这种类型的定时任务相对于SimpleJob来讲还是后者使用更频繁,这种作业我们了解就行,它主要新建任务类时需要继承DataflowJob接口,实现它的两个方法,一个是取数据的方法,一个是处理数据的方法,下文将详细说明一个流式作业的创建到运行。


一、新建MyDataflowJob

需要继承DataflowJob需要指明要处理的数据类型,fetchData是取数据的方法,取完数据,给processData方法去执行,这种流式作业就是一个方法取数据,另一个方法处理数据。

在这里插入图片描述

先定义好等会需要用到的数据:

private List<Integer> list = new ArrayList<>();
{
    list.add(0);
    list.add(1);
    list.add(2);
    list.add(3);
    list.add(4);
    list.add(5);
    list.add(6);
    list.add(7);
    list.add(8);
    list.add(9);
}

抓取数据方法:

public List fetchData(ShardingContext shardingContext) {
    List<Integer> rtnList = new ArrayList<>();
    // 取数据规则: 数字 % 分片总数 == 当前分片项
    for (Integer index : list){
        if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
            rtnList.add(index);
            // 每次只获取一个
            break;
        }
    }
    // 模拟数据抓取过程,耗时3s
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    log.info("我是分片项:" + shardingContext.getShardingItem() + ", 我抓取的数据是:" + rtnList);
    return rtnList;
}

处理数据方法:

public void processData(ShardingContext shardingContext, List data) {
    // 数据处理
    list.removeAll(data);
    // 模拟数据处理
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    log.info("我是分片项:" + shardingContext.getShardingItem() + ",我移除的数据是:" + data);
}

二、新建注解@ElasticDataflowJob

在MyDataflowJob类上加上这个注解,streamingProcess是否流式处理

在这里插入图片描述
在这里插入图片描述

新建DataflowJobAutoConfig–流式作业自动装配类,该类的意义也是通过反射将 DataflowJob作业自动注册进JobScheduler

  • @Configuration : 定义一个配置类
  • @ConditionalOnBean(CoordinatorRegistryCenter.class) :启动条件,发现协调注册中心后启动
  • @AutoConfigureAfter(ZookeeperAutoConfig.class) :在zookeeper注册中心配置类之后启动

在这里插入图片描述

注册进JobScheduler

public void initDataflow(){
    Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticDataflowJob.class);
    for (Map.Entry<String, Object> entry : beans.entrySet()){
        Object instance = entry.getValue();
        Class<?>[] interfaces = instance.getClass().getInterfaces();
        for (Class<?> superInterface : interfaces){
            // 如果继承的接口是DataflowJob,就将他注册进JobScheduler,实现自动启动
            if (superInterface == DataflowJob.class){
                ElasticDataflowJob annotation = instance.getClass().getAnnotation(ElasticDataflowJob.class);
                String jobName = annotation.jobName();
                String cron = annotation.cron();
                int shardingTotalCount = annotation.shardingTotalCount();
                boolean overwrite = annotation.overwrite();
                boolean streamingProcess = annotation.streamingProcess();

                //第一步job核心配置
                JobCoreConfiguration jcc = JobCoreConfiguration
                        .newBuilder(jobName, cron, shardingTotalCount)
                        .build();

                //第二步job类型配置
                JobTypeConfiguration jtc = new DataflowJobConfiguration(jcc,
                        instance.getClass().getCanonicalName(), streamingProcess);

                //第三步job根的配置
                LiteJobConfiguration ljc = LiteJobConfiguration
                        .newBuilder(jtc)
                        .overwrite(overwrite)
                        .build();
                // 注册进JobScheduler
                new JobScheduler(zkCenter, ljc).init();
            }
        }
    }
}

三、启动SpringBoot项目

DataflowJob类型定时任务已经跑起来了

在这里插入图片描述
此外需要注意的是,我们也要将DataflowAutoConfig类配置到spring.factories配置文件中,让spring发现它。
在这里插入图片描述


总结

SpringBoot整合流式作业就介绍到这儿,比较简单,代码点击此处下载

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71701.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!