Greenplum——基于Greenplum-Spark Connector的Spark脚本开发及遇到的坑

导读:本篇文章讲解 Greenplum——基于Greenplum-Spark Connector的Spark脚本开发及遇到的坑,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

参考博客:

Greenplum-Spark Connector 介绍_Greenplum中文社区的博客-CSDN博客

比pgload更快更方便写入大数据量至Greenplum的Greenplum-Spark Connector_秣码一盏的博客-CSDN博客

1、背景

        官方推荐了几种将外部数据写入Greenplum方式,分别是jdbc、pgcopy、gpdfdist、以及Pivotal Greenplum-Spark Connector,在官方文档描述中:

  • jdbc写大数据量会很慢,也是最不推荐做大数据写入的;
  • pgcopy会比jdbc快,但是会占用master节点的资源;
  • gpdfdist不占用master资源,直接写入segment,能并行写入,但缺点是需要安装客户端,包括gpfdist等依赖;
  • Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口,下面会基于该组件进行数据写入测试;下载地址(Download VMware Tanzu™ Greenplum® — VMware Tanzu Network

2、测试代码

2.1 核心测试类

package com.greenplum.spark.gsc

import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description: TODO 
 * @Author: chenweifeng
 * @Date: 2022年08月16日 下午4:00
 **/
object GreenplumSparkTest {
  // 全局log对象
  val LOGGER = Logger.getLogger(this.getClass)

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("gsc-greenplum-test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("INFO")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    println("spark-version:" + spark.version)
    // spark读greenplum
    val gscReadOptionMap = Map(
      "url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
      "user" -> "gpadmin",
      "password" -> "******",
      "dbschema" -> "public",
      "dbtable" -> "test_datax_gp_spark"
    )

    val gpdf: DataFrame = spark.read.format("greenplum")
      .options(gscReadOptionMap)
      .load()

    gpdf.show()

    // spark写greenplum
    val gscWriteOptionMap = Map(
      "url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
      "user" -> "gpadmin",
      "password" -> "******",
      "dbschema" -> "public",
      "dbtable" -> "test_datax_gp_spark_w"
    )

    gpdf.write.format("greenplum")
      .mode(SaveMode.Append)
      .options(gscWriteOptionMap)
      .save()

    sc.stop()
  }
}

2.2 Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.greenplum.spark</groupId>
    <artifactId>gsc-scala-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.4.5</spark.version>
        <scala.version>2.12</scala.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.apache.spark</groupId>-->
<!--            <artifactId>spark-hive_${scala.version}</artifactId>-->
<!--            <version>${spark.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.spark</groupId>-->
<!--            <artifactId>spark-mllib_${scala.version}</artifactId>-->
<!--            <version>${spark.version}</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>io.pivotal.greenplum.spark</groupId>
            <artifactId>greenplum-spark_${scala.version}</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.pivotal</groupId>
            <artifactId>greenplum-jdbc</artifactId>
            <version>5.1.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.3-1102-jdbc4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>${scala.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3 打包部署

3.1 打包

mvn clean package

3.2 上传包

分别上传greenplum-spark_2.12-2.1.0.jar 和 gsc-scala-test-1.0-SNAPSHOT.jar 这两个包到 spark的bin目录下

3.3 提交Spark任务执行命令

spark-submit \
--class com.greenplum.spark.gsc.GreenplumSparkTest \
--master spark://localhost:7078 \
--jars greenplum-spark_2.12-2.1.0.jar \
gsc-scala-test-1.0-SNAPSHOT.jar

遇到的坑

1、Greenplum-Spark Connector驱动和Spark的版本兼容问题

目前greenplum-spark_2.12-2.1.2.jar只支持Spark2.x版本的环境,如果用Spark3.x会报错不兼容, 后续得看是否会出新的驱动支持spark3.x。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/71332.html

(0)
小半的头像小半

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!