【手把手】分布式定时任务调度解析之Elastic-Job

导读:本篇文章讲解 【手把手】分布式定时任务调度解析之Elastic-Job,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1、这货怎么没怎么听过

经常使用Quartz或者Spring Task的小伙伴们,或多或少都会遇到几个痛点,比如:
1、不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误;
2、Quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展;

在当当的ddframe框架中,需要一个任务调度系统。实现的话有两种思路,一个是修改开源产品,一种是基于开源产品搭建,也就是封装。当当选择了后者,最开始这个调度系统叫做dd-job。它是一个无中心化的分布式调度框架。因为数据库缺少分布式协调功能(比如选主),替换为Zookeeper后,增加了弹性扩容和数据分片的功能。Elastic-Job是ddframe中的dd-job作业模块分离出来的作业框架,基于Quartz和Curator开发,在2015年开源。

【手把手】分布式定时任务调度解析之Elastic-Job

Elastic-Job是当当网架构师张亮、曹昊和江树建基于Zookepper和Quartz开发并开源的一个Java分布式定时任务(跟大名鼎鼎的ElasticSearch没有半毛钱关系),解决了Quartz不支持分布式的弊端。Elastic-Job主要的功能有支持弹性扩容,通过Zookepper集中管理和监控job,支持失效转移等。最开始只有一个elastic-job-core的项目,在2.X版本以后主要分为Elastic-Job-Lite和Elastic-Job-Cloud两个子项目。其中,Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。而Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务(跟Lite的区别只是部署方式不同,使用相同的API,只要开发一次)。

之所以感觉这货没什么人用,是因为这个项目在2020年捐给了Apache之后,将原生的编程方式基本删光了,现在在网上很难再找到原生的编程方式。更离谱的是,官网给出的Demo还漏了很多,也就是说你跟着官网一步一步写下来,结果还构建不起来…..

那为啥我还要写这么一篇文章出来,按道理说一个连官网都放弃的东西,费这些精力干啥。因为Elastic-Job中有一个很关键的概念:分片,也就是任务分片策略,这也正是它和Quartz之间最大的区别所在。而现在的Elastic-Job已经成为了一个二级子项目了,它的注册中心依赖于Zookeeper,所以如果要使用Elastic-Job的话,前提得先安装Zookeeper服务。

2、Quartz遗留的问题

① 假设在一个Quartz集群中有多个正在运行的节点,要如何决定哪些任务在哪些节点上运行呢?Quartz的处理方法非常简单粗暴,就是随机的。通过数据库中存储的信息,去抢占下一个即将触发的触发器所绑定的任务权限,并不支持对任务的执行节点进行协调;

② 当处理一个非常复杂的任务的时候,某一个节点的性能始终是有限的。如果可以将一个复杂的任务拆分成多个子任务,分别交由不同的节点协同处理,效率上必定事半功倍;

③ Quartz本身并不支持图形化管理页面,对于任务的管理非常的不方便;

3、初试Elastic-Job

以上的这些问题,Elastic-Job统统都可以解决,在弥补Quartz不足的这方面,Elastic-Job是认真的,但必须承认的是,它真的也没有特别好用。很遗憾,现在去Elastic-Job的官网已经访问不了了,这个官网早些年前被买给了快橙(一个VPN工具),现在它的官网已经沦落为shardingsphere下的一个二级网站:https://shardingsphere.apache.org/elasticjob/index_zh.html

启动Zookeeper服务

因为Elastic-Job是依赖于Zookeeper,所以先确保相关的Zookeeper服务启动成功。这里方便演示,我就不再专门弄个虚拟机搭一个Zookeeper服务,直接在本地启动Zookeeper服务。有几个点需要注意一下:

1、Zookeeper的压缩包解压缩之后,进入目录总,新建data目录存放数据,新建logs目录存放日志;

【手把手】分布式定时任务调度解析之Elastic-Job

2、 进入conf目录中,复制zoo_sample.cfg一份,并从命名为zoo.cfg作为Zookeeper的配置文件,需要进行一些基础的修改;

【手把手】分布式定时任务调度解析之Elastic-Job

3、进入Zookeeper中的bin目录下,执行:zkServer.cmd

【手把手】分布式定时任务调度解析之Elastic-Job

看到日志输出 ZooKeeper audit is enabled.  则说明Zookeeper服务启动成功。

引入Elastic-Job相关依赖

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-core</artifactId>
    <version>3.0.1</version>
</dependency>

目前Maven中央仓库最新的版本是3.0.2,更新于22年10月23日,而3.0.1是目前使用最多的版本,更新于21年10月11日。好家伙,一年一更,看这个维护的频率就知道这玩意儿没什么人用。

SimpleJob

自定义需执行的任务,实现SimpleJob接口

【手把手】分布式定时任务调度解析之Elastic-Job

构建注册中心, 配置任务作业

【手把手】分布式定时任务调度解析之Elastic-Job

启动Elastic-Job服务

可以看到,指定的7个分片进行任务的执行。这里只是简单的打印一句话,在实际的业务中,可以通过代码判断,控制不同的分片执行不同的任务,就可以进行任务的拆分执行。

【手把手】分布式定时任务调度解析之Elastic-Job

官网的教程甚至都没有给出如何构建作业配置…..

【手把手】分布式定时任务调度解析之Elastic-Job

DataFlowJob

上面演示的是最简单的任务模式,Elastic-Job还提供了另外一种数据流模式:DataFlowJob,用于处理数据流。必须实现fetchData()和processData()的方法,一个用来获取数据,一个用来处理获取到的数据,其它跟SimpleJob没有区别。

实现DataFlowJob接口

【手把手】分布式定时任务调度解析之Elastic-Job

构建注册中心, 配置任务作业

【手把手】分布式定时任务调度解析之Elastic-Job

启动Elastic-Job服务

可以看到,定义的3个分片进行任务执行

【手把手】分布式定时任务调度解析之Elastic-Job

ScriptJob

Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。说白了,这种类型的Job就是定时去执行脚本文件。把需要执行的脚本文件写好,告诉它在哪里,什么名字,到点了它自动去给你跑了就算完事儿。

【手把手】分布式定时任务调度解析之Elastic-Job

使用ScriptJob这种类型的时候,有几个点需要注意一下:
1、在构建调度器Scheduler时,参数elasticJobType固定写死就是”SCRIPT”;
2、在构建作业配置时,.setProperty方法的第一个参数也是固定写死”script.command.line”,第二个参数是脚本文件的绝对路径;

脚本内容就是很简单的一句话:@echo ——【脚本任务】Sharding Context: %*

【手把手】分布式定时任务调度解析之Elastic-Job

4、通过SPI的方式实现自定义作业类型

自定义任务接口,继承ElasticJob接口

【手把手】分布式定时任务调度解析之Elastic-Job

实现自定义的任务接口

【手把手】分布式定时任务调度解析之Elastic-Job

自定义任务执行器接口,继承ClassedJobItemExecutor接口

【手把手】分布式定时任务调度解析之Elastic-Job

实现自定义的任务执行器接口

【手把手】分布式定时任务调度解析之Elastic-Job

创建配置文件

在 resources.META-INF.services 目录下,以任务执行器接口的全路径名称创建一个配置文件,内容就是自定义的任务执行器实现类的全路径名

【手把手】分布式定时任务调度解析之Elastic-Job

执行自定义的作业

package com.feenix.elasticjob.service;

import com.feenix.elasticjob.service.impl.FeenixJobImpl;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

public class Application {

    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(),
                                 new FeenixJobImpl(),
                                 createJobConfiguration())
                                 .schedule();
    }

    // 构建Zookeeper注册中心
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zkConfiguration = new ZookeeperConfiguration("192.168.0.31:2181",
                                                                           "feenix-job");
        zkConfiguration.setConnectionTimeoutMilliseconds(100000);
        zkConfiguration.setMaxRetries(10);

        ZookeeperRegistryCenter zkRegistryCenter = new ZookeeperRegistryCenter(zkConfiguration);
        zkRegistryCenter.init();
        return zkRegistryCenter;
    }

    // 作业配置
    private static JobConfiguration createJobConfiguration() {
        String jobs = "0=宋远桥,1=俞莲舟,2=俞岱岩,3=张松溪,4=张翠山,5=殷梨亭,6=莫声谷";
        return JobConfiguration.newBuilder("FeenixJobImpl", 7)
                               .cron("0/3 * * * * ?")
                               .shardingItemParameters(jobs)
                               // 使用自定义的作业分片策略
                               /*.jobSharding   StrategyType("Shuffle")*/
                               // 允许客户端配置覆盖注册中心
                               .overwrite(true)
                               // 故障转移
                               .failover(true)
                               .build();
    }

}

【手把手】分布式定时任务调度解析之Elastic-Job

5、分片

Elastic-Job中任务分片的概念,使得任务可以在分布式环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,Elastic-Job会近乎实时的感知服务器的数量变更,从而重新为分布式服务器分配更合理的任务分片项,是的任务可以随着资源的增加而提升效率。

【手把手】分布式定时任务调度解析之Elastic-Job

不过在上文中有提到过,Elastic-Job并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。分片项为数字,从0开始。例如:按照地区水平拆分数据库,数据库 A 是北京的数据;数据库 B 是上海的数据;数据库 C 是广州的数据。 如果仅按照分片项配置,开发者需要了解 0 表示北京;1 表示上海;2 表示广州。 合理使用个性化参数可以让代码更可读,如果配置为 0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

AverageAllocationJobShardingStrategy

基于平均分配算法的分片策略,也是默认的分片策略。如果分片不能整除,则余下的分片依次追加到顺序序号的服务器:
1、3台服务器,分成9片。第1台为[0,1,2]、第2台为[3,4,5]、第3台为[6,7,8];
2、3台服务器,分成8片。第1台为[0,1,6]、第2台为[2,3,7]、第3台为[4,5];
3、3台服务器,分成10片。第1台为[0,1,2,9]、第2台为[3,4,5]、第3台为[6,7,8];

OdevitySortByNameJobShardingStrategy

根据作业名哈希值的奇偶数决定IP升降序算法的分片策略。奇数则IP升序,偶数则IP降序,用于不同的作业平均分配负载至不同的服务器。

自定义分片策略

实现JobShardingStrategy接口

package com.feenix.elasticjob.strategy;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.*;

public class CustomJobShardingStrategy implements JobShardingStrategy {
    @Override
    public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
        // 作业分片加入容器
        ArrayList<Integer> customShardingList = new ArrayList<>();
        for (int i = 0; i < shardingTotalCount; i++) {
            customShardingList.add(i);
        }

        // 将容器中的作业分片项顺序打乱
        Collections.shuffle(customShardingList);

        // 模拟AverageAllocationJobShardingStrategy算法
        Map<JobInstance, List<Integer>> result = shardingCustom(jobInstances, shardingTotalCount, customShardingList);
        addCustom(jobInstances, shardingTotalCount, result, customShardingList);
        return result;
    }

    private Map<JobInstance, List<Integer>> shardingCustom(final List<JobInstance> shardingUnits,
                                                           final int shardingTotalCount,
                                                           final ArrayList<Integer> customShardingList) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            // 每个作业服务器申请的作业分片项列表,容量是itemCountPersharding+1,为每个作业最大的分片项
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(customShardingList.get(i));
            }

            result.put(each, shardingItems);
            count++;
        }

        return result;
    }

    private void addCustom(final List<JobInstance> shardingUnits,
                           final int shardingTotalCount,
                           final Map<JobInstance, List<Integer>> shardingResults,
                           final ArrayList<Integer> customShardingList) {
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(customShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count));
            }
            count++;
        }
    }

    @Override
    public String getType() {
        return "CUSTOM";
    }
}

创建配置文件

在 resources.META-INF.services 目录下,以自定义策略实现类的接口的全路径名称创建一个文件,内容就是自定义策略实现类的全路径

【手把手】分布式定时任务调度解析之Elastic-Job

使用IDEA开启两个不同的服务实例

package com.feenix.elasticjob.service;

import com.feenix.elasticjob.service.impl.FeenixJobImpl;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

public class Application {

    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new FeenixJobImpl(), createJobConfiguration()).schedule();
    }

    // 构建Zookeeper注册中心
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zkConfiguration = new ZookeeperConfiguration("192.168.0.31:2181",
                                                                           "feenix-job");
        zkConfiguration.setConnectionTimeoutMilliseconds(100000);
        zkConfiguration.setMaxRetries(10);

        ZookeeperRegistryCenter zkRegistryCenter = new ZookeeperRegistryCenter(zkConfiguration);
        zkRegistryCenter.init();
        return zkRegistryCenter;
    }

    // 作业配置
    private static JobConfiguration createJobConfiguration() {
        String jobs = "0=宋远桥,1=俞莲舟,2=俞岱岩,3=张松溪,4=张翠山,5=殷梨亭,6=莫声谷";
        return JobConfiguration.newBuilder("FeenixJobImpl", 7)
                               .cron("0/3 * * * * ?")
                               .shardingItemParameters(jobs)
                               // 使用自定义的作业分片策略
                               .jobShardingStrategyType("CUSTOM")
                               // 允许客户端配置覆盖注册中心
                               .overwrite(true)
                               // 故障转移
                               .failover(true)
                               .build();
    }

}

jobShardingStrategyType的值要设为”CUSTOM”,因为在自定义分片策略的时候,已经将自定义的策略类型写死为”CUSTOM”。

可以看到,在同时开启两个实例的情况下,根据自定义的策略作业被分配到两个不同的实例上执行

【手把手】分布式定时任务调度解析之Elastic-Job【手把手】分布式定时任务调度解析之Elastic-Job

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/111887.html

(0)
Java光头强的头像Java光头强

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!