DolphinScheduler二次开发

导读:本篇文章讲解 DolphinScheduler二次开发,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

官方网站:https://dolphinscheduler.apache.org/zh-cn

改动的功能:

1、新增一个task类型

  1. 找到TaskType.java文件,新增一个任务类型
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.grgbanking.ds.datazone.common.enums;
    
    import com.baomidou.mybatisplus.annotation.EnumValue;
    
    /**
     * task node type
     */
    public enum TaskType {
        /**
         * 0 SHELL
         * 1 SQL
         * 2 SUB_PROCESS
         * 3 PROCEDURE
         * 4 MR
         * 5 SPARK
         * 6 PYTHON
         * 7 DEPENDENT
         * 8 FLINK
         * 9 HTTP
         * 10 DATAX
         * 11 CONDITIONS
         * 12 SQOOP
         * 13 SYNC(新增的)
         */
        SHELL(0, "shell"),
        SQL(1, "sql"),
        SUB_PROCESS(2, "sub_process"),
        PROCEDURE(3, "procedure"),
        MR(4, "mr"),
        SPARK(5, "spark"),
        PYTHON(6, "python"),
        DEPENDENT(7, "dependent"),
        FLINK(8, "flink"),
        HTTP(9, "http"),
        DATAX(10, "datax"),
        CONDITIONS(11, "conditions"),
        SQOOP(12, "sqoop"),
        SYNC(13, "sync");
    
        TaskType(int code, String descp){
            this.code = code;
            this.descp = descp;
        }
    
        @EnumValue
        private final int code;
        private final String descp;
    
        public static boolean typeIsNormalTask(String typeName) {
            TaskType taskType = TaskType.valueOf(typeName);
            return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT);
        }
    
        public int getCode() {
            return code;
        }
    
        public String getDescp() {
            return descp;
        }
    }
    
  2. 定义你的参数类,用于json解析参数
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.grgbanking.ds.datazone.common.task.sync;
    
    
    import com.grgbanking.ds.datazone.common.process.ResourceInfo;
    import com.grgbanking.ds.datazone.common.task.AbstractParameters;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 数据同步参数
     */
    @Data
    public class SyncParameters extends AbstractParameters {
    
        /**
         * 源数据host
         */
        private String sourceHost;
    
        /**
         * 源数据port
         */
        private String sourcePort;
    
        /**
         * 源数据类型
         */
        private String sourceType;
    
        /**
         * 源数据库
         */
        private String sourceDatabase;
    
        /**
         * 源数据schema
         */
        private String sourceSchema;
    
        /**
         * 源数据用户
         */
        private String sourceUser;
    
        /**
         * 源数据密码
         */
        private String sourcePassword;
    
        /**
         * 源数据信息id
         */
        private int sourceId;
    
        /**
         * 目标数据host
         */
        private String targetHost;
    
    
        /**
         * 目标数据port
         */
        private String targetPort;
    
        /**
         * 目标数据类型
         */
        private String targetType;
    
        /**
         * 目标数据库
         */
        private String targetDatabase;
    
        /**
         * 目标数据schema
         */
        private String targetSchema;
    
        /**
         * 目标数据用户
         */
        private String targetUser;
    
        /**
         * 目标数据密码
         */
        private String targetPassword;
    
        /**
         * 目标数据信息id
         */
        private int targetId;
        /**
         * 同步列表
         */
        private List<Map<String, String>> syncList;
    
    
        @Override
        public boolean checkParameters() {
            return syncList != null && !syncList.isEmpty();
        }
    
        @Override
        public List<ResourceInfo> getResourceFilesList() {
            return new ArrayList<>();
        }
    }
    

  3.  在 AbstractTask.java下新增类型DolphinScheduler二次开发
  4. 在worker文件夹下,找到task目录,新增sync文件夹,然后新建SyncTask.javaDolphinScheduler二次开发
  5.  SyncTask.java如下,可作为参考
package com.grgbanking.ds.datazone.server.worker.task.sync;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.grgbanking.ds.datazone.common.Constants;
import com.grgbanking.ds.datazone.common.task.AbstractParameters;
import com.grgbanking.ds.datazone.common.task.sync.SyncParameters;
import com.grgbanking.ds.datazone.common.utils.JSONUtils;
import com.grgbanking.ds.datazone.server.entity.TaskExecutionContext;
import com.grgbanking.ds.datazone.server.worker.config.WorkerConfig;
import com.grgbanking.ds.datazone.server.worker.task.AbstractTask;
import com.grgbanking.ds.datazone.service.bean.SpringApplicationContext;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;


public class SyncTask extends AbstractTask {

    /**
     * 数据同步参数
     */
    private SyncParameters syncParameters;

    /**
     * taskExecutionContext
     */
    private TaskExecutionContext taskExecutionContext;

    /**
     *  worker config
     */
    private final WorkerConfig workerConfig;

    /**
     * constructor
     *
     * @param taskExecutionContext taskExecutionContext
     * @param logger               logger
     */
    protected SyncTask(TaskExecutionContext taskExecutionContext, Logger logger) {
        super(taskExecutionContext, logger);

        this.taskExecutionContext = taskExecutionContext;
        this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
    }


    @Override
    public void init() {

        logger.info("sync task params {}", taskExecutionContext.getTaskParams());
        this.syncParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SyncParameters.class);

        if (!syncParameters.checkParameters()) {
            throw new RuntimeException("sync task params is not valid");
        }
    }

    @Override
    public void handle() throws Exception {

        String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
        Thread.currentThread().setName(threadLoggerInfoName);

        try{
            RestTemplate restTemplate = new RestTemplate();
            //创建请求头
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            headers.add("API_KEY", workerConfig.getApiKey());
            headers.add("API_SECRET", workerConfig.getApiSecret());
            int migrateTaskId = createTask(restTemplate);
            if(migrateTaskId == -1){
                setExitStatusCode(-1);
                throw new Exception("与数据同步服务器通讯失败");
            }

            long startTime = System.currentTimeMillis();
            while(true){

                int taskStatus = queryTask(restTemplate, migrateTaskId);
                //假设2为完成状态
                if(taskStatus == 2){
                    break;
                }
                if(System.currentTimeMillis() - startTime >= 3 * 60 * 1000){
                    exitStatusCode = -1;
                    throw new Exception("查询任务状态信息超时");
                }
                Thread.sleep(3000);
            }
            JSONObject logResult = queryLogMsg(restTemplate, migrateTaskId);
            setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
            //处理日志信息
            //...
        } catch (Exception e){
            exitStatusCode = -1;
            throw e;
        }
    }

    @Override
    public AbstractParameters getParameters() {
        return syncParameters;
    }

    private int createTask(RestTemplate restTemplate){
        //返回创建的任务ID,用于查询任务状态信息
        JSONObject json=new JSONObject();
        json.put("destDetails",syncParameters.getTargetUrl());
        json.put("sourceDetails",syncParameters.getSourceUrl());
        try {
            String requestUrl = workerConfig.getApiUrl() + "/MigrateTask";
            ResponseEntity<String> responseEntity = restTemplate.postForEntity(requestUrl, json, String.class);
            String resultStr = responseEntity.getBody();
            JSONObject result = JSON.parseObject(resultStr, JSONObject.class);
            if(result.getBoolean("success")!=true){
                logger.error("服务通讯反馈的信息:{}", resultStr);
            }
            return result.getInteger("data");

        } catch (ResourceAccessException e) {
            logger.error(e.getMessage(), e);
            return -1;
        } catch (HttpClientErrorException e) {
            logger.error("调用异常", e);
            if (StringUtils.contains(e.getMessage(), "404")) {
                return -1;
            }
            return -1;
        } catch (Exception e) {
            logger.error(e.getMessage());
            return -1;
        }
    }

    private int queryTask(RestTemplate restTemplate, int migrateTaskId){
        try {
            String requestUrl = workerConfig.getApiUrl() + "/MigrateTask/" + migrateTaskId;
            ResponseEntity<String> responseEntity = restTemplate.getForEntity(requestUrl, String.class);
            String resultStr = responseEntity.getBody();
            JSONObject result = JSON.parseObject(resultStr, JSONObject.class);
            if(result.getBoolean("success")!=true){
                logger.error("服务通讯反馈的信息:{}", resultStr);
            }
//            return result.getInteger("data");
            return 2;

        } catch (ResourceAccessException e) {
            logger.error(e.getMessage(), e);
            return -1;
        } catch (HttpClientErrorException e) {
            logger.error("调用异常", e);
            if (StringUtils.contains(e.getMessage(), "404")) {
                return -1;
            }
            return -1;
        } catch (Exception e) {
            logger.error(e.getMessage());
            return -1;
        }
    }

    private JSONObject queryLogMsg(RestTemplate restTemplate, int migrateTaskId){
        try {
            String requestUrl = workerConfig.getApiUrl() + "/MigrateTask/log/" + migrateTaskId;
            ResponseEntity<String> responseEntity = restTemplate.getForEntity(requestUrl, String.class);
            String resultStr = responseEntity.getBody();
            JSONObject result = JSON.parseObject(resultStr, JSONObject.class);
            if(result.getBoolean("success")!=true){
                logger.error("服务通讯反馈的信息:{}", resultStr);
            }
//            return result.getInteger("data");
            return result;

        } catch (ResourceAccessException e) {
            logger.error(e.getMessage(), e);
            throw e;
        } catch (HttpClientErrorException e) {
            logger.error("调用异常", e);
            if (StringUtils.contains(e.getMessage(), "404")) {
                throw new RuntimeException("404");
            }
            throw e;
        } catch (Exception e) {
            logger.error(e.getMessage());
            throw e;
        }
    }
}

 6. 找到TaskManager.java,在里面新增任务类型

DolphinScheduler二次开发

 

2、登录授权修改,整合第三方认证中心

3、API整合nacos实现声明式服务调用

4、集成Python3 Python DataX的基础镜像

DockerFile:

FROM openjdk:8-jre-slim-buster

# 1. install command/library/software
# If install slowly, you can replcae debian's mirror with new mirror, Example:
RUN { \
 echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster main contrib non-free"; \
 echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main contrib non-free"; \
 echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster-backports main contrib non-free"; \
 echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main contrib non-free"; \
} > /etc/apt/sources.list

RUN apt-get update && \
    apt-get install -y --no-install-recommends tzdata dos2unix python3 python supervisor procps psmisc netcat sudo tini curl iputils-ping wget && \
    echo "Asia/Shanghai" > /etc/timezone && \
    rm -f /etc/localtime && \
    dpkg-reconfigure tzdata && \
    rm -rf /var/lib/apt/lists/* /tmp/* && \
    wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz && \
    mkdir /opt/soft && \
    tar -zxvf datax.tar.gz -C /opt/soft && \
    rm -rf datax.tar.gz && \
    rm -rf /opt/soft/datax/plugin/writer/._* && \
    rm -rf /opt/soft/datax/plugin/reader/._*

制作完这个基础镜像,然后原先的openjdk:8-jre-slim-buster基础镜像改成你自定义的镜像。

5、统一数据源中心,新建嵌入Feign远程调用数据源中心保持强一致性,定时同步实现最终一致性。

遇到的坑:

1、master 读取配置文件失败导致启动失败

2、集成DataX遇到的坑

datax执行自测脚本异常:

2022-02-18 09:04:50.468 [main] WARN  ConfigParser - 插件[streamreader,streamwriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/Users/chenweifeng/Downloads/datax/plugin/reader/.DS_Store/plugin.json]不存在. 请检查您的配置文件. 
2022-02-18 09:04:51.486 [main] ERROR Engine - 

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/Users/chenweifeng/Downloads/datax/plugin/reader/.DS_Store/plugin.json]不存在. 请检查您的配置文件.
	at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
	at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
	at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
	at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:125)
	at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
	at com.alibaba.datax.core.Engine.entry(Engine.java:137)
	at com.alibaba.datax.core.Engine.main(Engine.java:204)

网上查了之后的说法是

DolphinScheduler二次开发

然后我就按照操作:

rm -rf ./plugin/reader/._*

rm -rf ./plugin/writer/._*

然后就成功了

DolphinScheduler二次开发

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/71357.html

(0)
小半的头像小半

相关推荐

极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!