MySQL数据表结构迁移到Postgre的Python脚本

导读:本篇文章讲解 MySQL数据表结构迁移到Postgre的Python脚本,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

前言

工作中遇到了需要把MySQL换成Postgre的需求,数据表需要做迁移,手动修改每一张表工作量比较大(容易出错),恰好最近学了几天Python,因而写了一个简单的Python脚本来完成这件事情。

程序的原理是类型转换,字符串替换,没有完成的事情是自增ID会丢失,并没有创建Serial序列来实现自增ID,因为我们的框架中的雪花ID本身就实现了自增

MySQL和Postgre的数据类型映射关系:https://en.wikibooks.org/wiki/Converting_MySQL_to_PostgreSQL

另外,还有一种更加方便的方式,直接使用Navicat的表转为模型,然后进行模型转换导出的方式
传送门:https://blog.csdn.net/greatjoe/article/details/114366625

程序代码

main.py

# encoding=utf-8
# readme: 脚本工作前提,需要把原来的MySQL数据库的DDL放入到一个文件中(fileName),执行后会生成多个DDL文件到输出目录
# 生成的DDL模式修改请移步到 write_table_ddl.py 文件中修改,生成的DDL会有各数据表的单独DDL文件和汇总的 allDLL.sql脚本文件
# 请留意控制台输出,根据控制台排查问题
import os

from db.mysql_to_pg.templateObject import Template

fileName = "C:\MyFileData\PythonScript\db\mysql_to_pg\mysql\ddl.sql" # MySQL的DDL文件
outputPath = "C:\MyFileData\PythonScript\db\mysql_to_pg\pg" # 输出Postgre的DDL文件目录
template = Template(fileName, outputPath)
template.createDDL()

# 先删除原来的汇总DDL文件
files = os.listdir(outputPath)
allDDL = outputPath + "\\allDLL.sql"
if os.path.exists(allDDL):  # 如果文件存在
    os.remove(allDDL)
for file in files:
    readStream = open(outputPath + "\\" + file, 'r', encoding='UTF-8')
    with open(allDDL, "a", encoding="utf-8") as pgStreams:
        pgStreams.write(readStream.read())
        pgStreams.write("\n\n\n")
    readStream.close()
pgStreams.close()

templateObject.py

from db.mysql_to_pg.colType import ColTypeClass
from db.write_table_ddl import OutPutDDL


class Template(object):
    """主要的功能处理类"""

    def __init__(self, fileName, outputPath):
        """fileName: DDL文件目录
        outputPath:输出文件路径
        """
        self.tableName = ''
        self.comments = ''
        self.cols = ''
        self.primeKey = "\n);"
        self.keys = ''
        self.tableComment = ''
        self.fileName = fileName
        self.outputPath = outputPath + "\#{tableName}.sql"

        # --------------- postgre保留字
        self.specialCol = ['name', 'version']
        self.SPACE = ' '

    def reInitial(self):
        self.tableName = ''
        self.comments = ''
        self.cols = ''
        self.primeKey = "\n);"
        self.keys = ''
        self.tableComment = ''
    def createDDL(self):
        colTypeClass = ColTypeClass()
        outputDDL = OutPutDDL()
        streams = open(self.fileName, 'r', encoding='UTF-8')
        line = streams.readline()
        # Debug 使用
        # line = "`scale` decimal(10,0) DEFAULT NULL COMMENT '缩放比例',"
        while line:
            self.colName = ''
            self.colType = ''
            self.type = ''
            self.comment = ''
            # line = "`measure_ids` text COMMENT '测点IDS',"
            line = line.lstrip(' ')
            # TODO 主处理逻辑
            if line.startswith("CREATE TABLE"):
                if (self.tableName != ''):
                    outputDDL.writeDDL(self)
                    self.reInitial()
                self.tableName = self.getMysqlTableName(line)

            # 数据表字段
            if line[0] == '`':
                colName = self.getMysqlColName(line)
                if 'AUTO_INCREMENT' in line:
                    print(self.tableName + " 数据表有自增字段,请手动处理, colName = " + colName)
                if colName in self.specialCol:
                    colName = '"' + colName + '"'
                colType = self.getMysqlType(line)
                pgType = colTypeClass.getPgType(colType)
                # 获取变长字段长度
                if (colType == "varchar" or colType == 'char'):
                    begin = line.find("char(") + 4
                    length = line[begin:line.find(")", begin, -1) + 1]
                    pgType += length
                elif colType == 'decimal':
                    begin = line.find("decimal(") + len("decimal")
                    length = line[begin:line.find(")", begin, -1) + 1]
                    pgType += length
                if (line.find("COMMENT") > 0):
                    commentBegin = line.find("COMMENT '") + len("COMMENT '")
                    commentEnd = line.find("',")
                    comment = line[commentBegin:commentEnd]
                nullStr = "NULL"
                if (line.find("DEFAULT NULL") == -1):
                    if ("NOT NULL" in line):
                        if (colType == 'bigint' or colType == 'varchar'):
                            nullStr = "NOT NULL"
                    else:
                        print(self.tableName + " 数据表 字段不为空,需要注意处理:colName = " + colName)
                try:
                    col = '	' + colName + self.SPACE + pgType + self.SPACE + nullStr + ", -- " + comment + '\n'
                except TypeError:
                    print('类型错误 tableName =' + self.tableName + ' colName = ' + colName)
                self.cols += col
                commentSql = outputDDL.commentTemplate.replace("#{tableName}", self.tableName) \
                    .replace("#{colName}", colName).replace("#{comment}", comment)
                comment = ''
                self.comments += commentSql

            # 主键约束
            if (line.startswith("PRIMARY KEY")):
                self.primeKey = outputDDL.primeKeyTemplate.replace("#{indexName}", self.tableName + "_pk")
                key = self.getIndexCols(line)
                key = key.replace('`', '')
                self.primeKey = self.primeKey.replace("#{colName}", key)

            # 索引
            #  KEY `idx_project_code` (`code`) USING BTREE,  或者 UNIQUE INDEX
            if (line.startswith("KEY") or line.startswith("UNIQUE KEY")):
                indexName = self.getMysqlColName(line)
                indexCols = self.getIndexCols(line)
                indexCols = indexCols.replace("`", '')
                key = outputDDL.keysTemplate.replace("#{indexName}", indexName)
                key = key.replace("#{tableName}", self.tableName).replace("#{indexCols}", indexCols)
                if (line.startswith("UNIQUE KEY")):
                    key = key.replace("INDEX", "UNIQUE INDEX")
                self.keys += key
            # 数据表comment
            if line.find("InnoDB") > 0:
                # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='测试订单表';
                tableCommentBegin = line.find("COMMENT='") + len("COMMENT='")
                tableCommentEnd = line.find("';")
                tableComentTemp = line[tableCommentBegin: tableCommentEnd]
                self.tableComment = outputDDL.tableCommentTemplate.replace("#{tableName}", self.tableName) \
                    .replace("#{tableComment}", tableComentTemp)

            line = streams.readline()

        outputDDL.writeDDL(self)
        streams.close()

    def getMysqlTableName(self, line):
        """获取字段名称"""
        begin = line.find('`')
        end = line.find('` ')
        if (begin >= 0 and end > 0):
            colType = line[begin + 1:end]
            return colType
        else:
            return ''

    def getMysqlColName(self, line):
        """获取字段名称"""
        begin = line.find('`')
        end = line.find('` ')
        if (begin >= 0 and end > 0):
            colType = line[begin + 1:end]
            return colType
        else:
            return ''

    def getMysqlType(self, line):
        """获取字段类型"""
        begin = line.rfind('` ')
        end = line.find('(')
        if (begin >= 0 and end > 0):
            colType = line[begin + 2:end]
            return colType
        else:
            if begin >= 0:
                subStr = line[begin + 2: -1]
                return subStr[0: subStr.find(" ")]
            return ''
    def getIndexCols(self, line):
        """获取索引的字段"""
        begin = line.find("(")
        end = line.find(")")
        return line[begin: end + 1]

    def getPrimeKeyCol(self, line):
        """获取主键索引"""
        begin = line.find("(")
        end = line.find(")")
        return line[begin: end + 1]                

colType.py

# MySQL 数据表迁移到Postgre数据库

# 迁移匹配规则:
# int -> int4 (int4)
# bigint -> int8
# varchar -> varchar
# tinyint -> int2 (smallint)
# decimal -> decimal
# longtext -> text
# json -> json
# double -> double precision
# float -> real


# AUTO_INCREMENT -> SEQUENCE
class ColTypeClass(object):
""""数据类型转换映射"""
    def __init__(self):
        self.typeMap = {}
        # 数字类型,有符号
        self.typeMap['tinyint'] = 'int2'
        self.typeMap['smallint'] = 'int2'
        self.typeMap['mediumint'] = 'int4'
        self.typeMap['int'] = 'int4'
        self.typeMap['bigint'] = 'int8'
        # 数字类型,无符号
        self.typeMap['tinyint unsigned'] = 'int2'
        self.typeMap['smallint unsigned'] = 'int2'
        self.typeMap['mediumint unsigned'] = 'int4'
        self.typeMap['int unsigned'] = 'int4'
        self.typeMap['bigint unsigned'] = 'number(20)'
        # 浮点类型
        self.typeMap['float'] = 'real'
        self.typeMap['float unsigned'] = 'real'
        self.typeMap['double'] = 'double precision'
        self.typeMap['decimal'] = 'decimal'
        # boolean
        self.typeMap['boolean'] = 'boolean'
        # char / varchar
        self.typeMap['char'] = 'varchar'
        self.typeMap['varchar'] = 'varchar'
        # text
        self.typeMap['tinytext'] = 'text'
        self.typeMap['text'] = 'text'
        self.typeMap['mediumtext'] = 'text'
        self.typeMap['longtext'] = 'text'
        # 二进制
        self.typeMap['binary'] = 'bytea'
        self.typeMap['varbinary'] = 'bytea'
        self.typeMap['tinyblob'] = 'bytea'
        self.typeMap['blob'] = 'bytea'
        self.typeMap['mediumblog'] = 'bytea'
        self.typeMap['longblog'] = 'bytea'
        # 时间
        self.typeMap['date'] = 'date'
        self.typeMap['time'] = 'time'
        self.typeMap['datetime'] = 'timestamp'
        self.typeMap['timestamp'] = 'timestamp'
        # json
        self.typeMap['json'] = 'json'

    # 获取pgType
    def getPgType(self, type):
        """
        获取MySQL对应的Postgre数据类型
        """
        return self.typeMap.get(type)
        

write_table_ddl

class OutPutDDL(object):
    def __init__(self):
        self.tableTemplate = "-- Drop table\n\n-- DROP TABLE edge.#{tableName};\nCREATE TABLE edge.#{tableName} (\n"
        self.primeKeyTemplate = "	CONSTRAINT #{indexName} PRIMARY KEY #{colName}\n);\n"
        self.keysTemplate = "CREATE INDEX #{indexName} ON edge.#{tableName} USING btree #{indexCols};\n"
        self.commentTemplate = "COMMENT ON COLUMN edge.#{tableName}.#{colName} IS '#{comment}';\n"
        self.tableCommentTemplate = "COMMENT ON TABLE edge.#{tableName} IS '#{tableComment}';\n"
        self.permissionsTemplate = "\n-- Permissions\n\nALTER TABLE edge.#{tableName} OWNER TO dev_idp_core;\nGRANT ALL ON TABLE edge.#{tableName} TO dev_idp_core;\n"

    def writeDDL(self, template):
        ddl = self.tableTemplate.replace("#{tableName}", template.tableName)
        ddl += template.cols
        ddl += template.primeKey
        ddl += template.keys
        ddl += template.tableComment
        ddl += '\n' + template.comments
        permissions = self.permissionsTemplate.replace("#{tableName}", template.tableName)
        ddl += permissions
        with open(template.outputPath.replace("#{tableName}", template.tableName), "w", encoding="utf-8") as pgStreams:
            pgStreams.write(ddl)
            pgStreams.close()


测试

MySQL数据表DDL

CREATE TABLE `idp_central_view` (
  `id` bigint(20) NOT NULL COMMENT '主键',
  `monitor_unit_id` bigint(20) DEFAULT NULL COMMENT '监测单元ID',
  `name` varchar(255) DEFAULT NULL COMMENT '中控图名称',
  `measure_ids` text COMMENT '测点IDS',
  `category_code` varchar(255) DEFAULT NULL COMMENT '中控图分类,具体分类参考IDP_ZKT_CATEGORY字典。',
  `release_status` tinyint(1) NOT NULL COMMENT '发布状态,-1未发布,0编辑中,1已发布',
  `release_json` longtext COMMENT '已发布的json对象',
  `description` varchar(255) DEFAULT NULL COMMENT '说明',
  `version` bigint(20) NOT NULL COMMENT '时间戳,版本号。用于同步对比。',
  `create_time` bigint(20) unsigned DEFAULT NULL COMMENT '创建时间',
  `deleted` tinyint(1) DEFAULT NULL COMMENT '是否删除',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idp_central_view_monitor_unit_id_idx` (`monitor_unit_id`) USING BTREE,
  KEY `idp_central_view_name_idx` (`name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='组件绘图-中控图表,2020930后中控绘制的新设计方案';

输出postgre数据表DDL

-- Drop table

-- DROP TABLE edge.idp_central_view;
CREATE TABLE edge.idp_central_view (
	id int8 NOT NULL, -- 主键
	monitor_unit_id int8 NULL, -- 监测单元ID
	"name" varchar(255) NULL, -- 中控图名称
	measure_ids text NULL, -- 测点IDS
	category_code varchar(255) NULL, -- 中控图分类,具体分类参考IDP_ZKT_CATEGORY字典。
	release_status int2 NULL, -- 发布状态,-1未发布,0编辑中,1已发布
	release_json text NULL, -- 已发布的json对象
	description varchar(255) NULL, -- 说明
	"version" int8 NOT NULL, -- 时间戳,版本号。用于同步对比。
	create_time int8 NULL, -- 创建时间
	deleted int2 NULL, -- 是否删除
	CONSTRAINT idp_central_view_pk PRIMARY KEY (id)
);
CREATE INDEX idp_central_view_monitor_unit_id_idx ON edge.idp_central_view USING btree (monitor_unit_id);
CREATE INDEX idp_central_view_name_idx ON edge.idp_central_view USING btree (name);
COMMENT ON TABLE edge.idp_central_view IS '组件绘图-中控图表,2020930后中控绘制的新设计方案';

COMMENT ON COLUMN edge.idp_central_view.id IS '主键';
COMMENT ON COLUMN edge.idp_central_view.monitor_unit_id IS '监测单元ID';
COMMENT ON COLUMN edge.idp_central_view."name" IS '中控图名称';
COMMENT ON COLUMN edge.idp_central_view.measure_ids IS '测点IDS';
COMMENT ON COLUMN edge.idp_central_view.category_code IS '中控图分类,具体分类参考IDP_ZKT_CATEGORY字典。';
COMMENT ON COLUMN edge.idp_central_view.release_status IS '发布状态,-1未发布,0编辑中,1已发布';
COMMENT ON COLUMN edge.idp_central_view.release_json IS '已发布的json对象';
COMMENT ON COLUMN edge.idp_central_view.description IS '说明';
COMMENT ON COLUMN edge.idp_central_view."version" IS '时间戳,版本号。用于同步对比。';
COMMENT ON COLUMN edge.idp_central_view.create_time IS '创建时间';
COMMENT ON COLUMN edge.idp_central_view.deleted IS '是否删除';

-- Permissions

ALTER TABLE edge.idp_central_view OWNER TO dev_idp_core;
GRANT ALL ON TABLE edge.idp_central_view TO dev_idp_core;

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/4588.html

(0)
小半的头像小半

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!