如何将MySql数据导入Elasticsearch中

导读:本篇文章讲解 如何将MySql数据导入Elasticsearch中,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

[版权申明] 非商业目的注明出处可自由转载
出自:shusheng007

概述

最近工作中有一个将MySql数据导入ES的需求,于是网上搜索了一下,发现不尽如人意。看了好几篇文章,还是不知道怎么做,在此记录一下自己成功的经验,没啥技术含量,但对于需要的人来说贼香…

方案

采用Logstash从MySql中抽取数据,然后导入ES中。原理如下图所示

在这里插入图片描述

环境准备

如果已经安装了mysql以及ES,可直接跳过以下部分。

安装Mysql/MariaDb

已经安装则跳过,此处采用docker安装。

  • 创建本地卷

我通过下面的操作创建了一个目录: ~/software/mariadb/data

cd ~
mkdir -p software/mariadb/data
  • 创建docker-compose文件

mariadb-dc.yaml

version: '3'

services:
  mariadb:
    image: mariadb:10.6.5
    ports:
      - 3306:3306
    volumes:
      - ~/software/mariadb/data:/var/lib/mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root
      - MYSQL_USER=ss007
      - MYSQL_PASSWORD=ss007
      - MYSQL_DATABASE=ss007_bd

其中有几点需要说明:

- MYSQL_USER=ss007
- MYSQL_PASSWORD=ss007

创建了一个叫ss007的用户

  • 执行

mariadb-dc.yaml目录下执行

docker-compose -f mariadb-dc.yaml up -d
  • 准备数据

数据库已经安装好,使用数据库管理工具,例如Navicat连接数据库并建表导入数据。

当然如果你习惯命令行,进入docker使用mysql命令行客户端操作也是可以的

-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `stu_name` varchar(32) NOT NULL DEFAULT '' COMMENT '姓名',
  `age` int(11) NOT NULL DEFAULT 18 COMMENT '年龄',
  `create_time` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COMMENT='学生表';

-- ----------------------------
-- Records of student
-- ----------------------------
INSERT INTO `student` VALUES (1, '王二狗', 30, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (2, '牛翠华', 25, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (3, '上官无雪', 18, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (4, '王婆', 50, '2022-02-19 10:57:16');

安装Elasticsearch与Kibana(可选)

已经安装则跳过,没安装请参考docker之如何使用docker来安装ELK(elasticsearch,logstash,kibana)

安装Logstash

下面将进入本文的关键部分。从官网下载了Logstash7.17.0的压缩包,解压即可。

写logstash配置文件

我们在config文件里新建一个名为my-demo-logstash.conf的文件,内容如下

input {
  jdbc {
    jdbc_driver_library => "./mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/ss007_db?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => root
    jdbc_password => root
    # jdbc_paging_enabled => "true" #是否进行分页
    # jdbc_page_size => "50000"
    tracking_column => "create_time"
    use_column_value => true
    # statement_filepath => "sql文件路径,与下面的执行语句二选1"
    statement => "SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > :sql_last_value;"
    # 设置监听间隔  各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => " 10 * * * * *"
  }
}
output {
  elasticsearch {
    document_id => "%{id}"
    # document_type => ""
    index => "ss007_index"
    hosts => ["localhost:9200"]
  }
  stdout{
    codec => rubydebug
  }
}

文件分为input 和output两部分,其实还可以有个filter。input 部分从mysql读取数据,output部分向ES插入数据,关键字段解释如下

input:

  • jdbc_driver_library:mysql-connector-java.jar 的路径,mysql-connector-java.jar 需要我们去官网下载,用来连接mysql数据库的。我放到了与bin文件同一级别,所以使用./作为路径,你要根据你相对bin文件夹放置的位置来修改这个路径。
  • tracking_column: mysql中的某一列,logstash用它来跟踪数据的变化。此处我用的是create_time列,那么当新增加一条记录,或者修改一下这个时间的话,logstash就会知道,从而更新数据。
  • use_column_value :是否使用tracking_column的值作为:sql_last_value的值。设置为true时,:sql_last_value = tracking_column。设置为false,:sql_last_value= 上次查询时间
  • statement:sql查询语句,其中:sql_last_value 已经在上面介绍过了
  • schedule:执行计划,可以通过它设置查询的执行计划,我这里设置每隔10秒执行一次

output:

如果ES中没有index,会自动创建

  • document_id => “%{id}” : ES中document的唯一id,这里使用student的主键。
  • index => “ss007_index” : ES 的索引

7.0以后不用设置document_type,默认doc

运行

导航到logstash路径下运行以下命令,注意7.x不能进入bin文件执行

 sudo  bin/logstash -f config/my-demo-logstash.conf

输出如下类似结果说明成功了

...
[2022-02-20T16:22:10,869][INFO ][logstash.inputs.jdbc     ][main][4a56def37a98a3ccef919df855d4d915c4fa56a1212a5e6c99a82aa2a6e70df9] (0.009233s) SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > 0;
{
           "name" => "王婆",
     "@timestamp" => 2022-02-20T08:22:10.938Z,
    "create_time" => 2022-02-19T02:57:16.000Z,
             "id" => 4,
       "@version" => "1",
            "age" => 50
}
{
           "name" => "王二狗",
     "@timestamp" => 2022-02-20T08:22:10.927Z,
    "create_time" => 2022-02-19T02:57:16.000Z,
             "id" => 1,
       "@version" => "1",
            "age" => 30
}
...

查看结果

从Kibana上查看ss007_index是否创建并导入了数据

在这里插入图片描述

更新数据

因为我们设置了每隔10秒执行一次,所以当我们插入新数据,或者更新数据导致create_time列的日期增大,logstash就会感知到,随即更新ES中的数据

  • 插入一条新数据时的输出
{
           "name" => "张妈",
     "@timestamp" => 2022-02-20T08:27:10.160Z,
    "create_time" => 2022-02-19T21:57:16.000Z,
             "id" => 5,
       "@version" => "1",
            "age" => 50
}
  • 修改已有数据时的输出
{
           "name" => "牛翠华",
     "@timestamp" => 2022-02-20T08:22:10.937Z,
    "create_time" => 2022-02-19T02:57:16.000Z,
             "id" => 2,
       "@version" => "1",
            "age" => 25
}

总结

搞这个我也是花了一天时间的,理论都知道,一动手就各种问题,如果它对你有帮助就点个赞再走…

参考文章:https://dzone.com/articles/migrating-mysql-data-to-elasticsearch-using-logsta

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/14662.html

(0)
小半的头像小半

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!