前言
工作中遇到了需要把MySQL换成Postgre的需求,数据表需要做迁移,手动修改每一张表工作量比较大(容易出错),恰好最近学了几天Python,因而写了一个简单的Python脚本来完成这件事情。
程序的原理是类型转换,字符串替换,没有完成的事情是自增ID会丢失,并没有创建Serial序列来实现自增ID,因为我们的框架中的雪花ID本身就实现了自增
MySQL和Postgre的数据类型映射关系:https://en.wikibooks.org/wiki/Converting_MySQL_to_PostgreSQL
另外,还有一种更加方便的方式,直接使用Navicat的表转为模型,然后进行模型转换导出的方式
传送门:https://blog.csdn.net/greatjoe/article/details/114366625
程序代码
main.py
# encoding=utf-8
# readme: 脚本工作前提,需要把原来的MySQL数据库的DDL放入到一个文件中(fileName),执行后会生成多个DDL文件到输出目录
# 生成的DDL模式修改请移步到 write_table_ddl.py 文件中修改,生成的DDL会有各数据表的单独DDL文件和汇总的 allDLL.sql脚本文件
# 请留意控制台输出,根据控制台排查问题
import os
from db.mysql_to_pg.templateObject import Template
fileName = "C:\MyFileData\PythonScript\db\mysql_to_pg\mysql\ddl.sql" # MySQL的DDL文件
outputPath = "C:\MyFileData\PythonScript\db\mysql_to_pg\pg" # 输出Postgre的DDL文件目录
template = Template(fileName, outputPath)
template.createDDL()
# 先删除原来的汇总DDL文件
files = os.listdir(outputPath)
allDDL = outputPath + "\\allDLL.sql"
if os.path.exists(allDDL): # 如果文件存在
os.remove(allDDL)
for file in files:
readStream = open(outputPath + "\\" + file, 'r', encoding='UTF-8')
with open(allDDL, "a", encoding="utf-8") as pgStreams:
pgStreams.write(readStream.read())
pgStreams.write("\n\n\n")
readStream.close()
pgStreams.close()
templateObject.py
from db.mysql_to_pg.colType import ColTypeClass
from db.write_table_ddl import OutPutDDL
class Template(object):
"""主要的功能处理类"""
def __init__(self, fileName, outputPath):
"""fileName: DDL文件目录
outputPath:输出文件路径
"""
self.tableName = ''
self.comments = ''
self.cols = ''
self.primeKey = "\n);"
self.keys = ''
self.tableComment = ''
self.fileName = fileName
self.outputPath = outputPath + "\#{tableName}.sql"
# --------------- postgre保留字
self.specialCol = ['name', 'version']
self.SPACE = ' '
def reInitial(self):
self.tableName = ''
self.comments = ''
self.cols = ''
self.primeKey = "\n);"
self.keys = ''
self.tableComment = ''
def createDDL(self):
colTypeClass = ColTypeClass()
outputDDL = OutPutDDL()
streams = open(self.fileName, 'r', encoding='UTF-8')
line = streams.readline()
# Debug 使用
# line = "`scale` decimal(10,0) DEFAULT NULL COMMENT '缩放比例',"
while line:
self.colName = ''
self.colType = ''
self.type = ''
self.comment = ''
# line = "`measure_ids` text COMMENT '测点IDS',"
line = line.lstrip(' ')
# TODO 主处理逻辑
if line.startswith("CREATE TABLE"):
if (self.tableName != ''):
outputDDL.writeDDL(self)
self.reInitial()
self.tableName = self.getMysqlTableName(line)
# 数据表字段
if line[0] == '`':
colName = self.getMysqlColName(line)
if 'AUTO_INCREMENT' in line:
print(self.tableName + " 数据表有自增字段,请手动处理, colName = " + colName)
if colName in self.specialCol:
colName = '"' + colName + '"'
colType = self.getMysqlType(line)
pgType = colTypeClass.getPgType(colType)
# 获取变长字段长度
if (colType == "varchar" or colType == 'char'):
begin = line.find("char(") + 4
length = line[begin:line.find(")", begin, -1) + 1]
pgType += length
elif colType == 'decimal':
begin = line.find("decimal(") + len("decimal")
length = line[begin:line.find(")", begin, -1) + 1]
pgType += length
if (line.find("COMMENT") > 0):
commentBegin = line.find("COMMENT '") + len("COMMENT '")
commentEnd = line.find("',")
comment = line[commentBegin:commentEnd]
nullStr = "NULL"
if (line.find("DEFAULT NULL") == -1):
if ("NOT NULL" in line):
if (colType == 'bigint' or colType == 'varchar'):
nullStr = "NOT NULL"
else:
print(self.tableName + " 数据表 字段不为空,需要注意处理:colName = " + colName)
try:
col = ' ' + colName + self.SPACE + pgType + self.SPACE + nullStr + ", -- " + comment + '\n'
except TypeError:
print('类型错误 tableName =' + self.tableName + ' colName = ' + colName)
self.cols += col
commentSql = outputDDL.commentTemplate.replace("#{tableName}", self.tableName) \
.replace("#{colName}", colName).replace("#{comment}", comment)
comment = ''
self.comments += commentSql
# 主键约束
if (line.startswith("PRIMARY KEY")):
self.primeKey = outputDDL.primeKeyTemplate.replace("#{indexName}", self.tableName + "_pk")
key = self.getIndexCols(line)
key = key.replace('`', '')
self.primeKey = self.primeKey.replace("#{colName}", key)
# 索引
# KEY `idx_project_code` (`code`) USING BTREE, 或者 UNIQUE INDEX
if (line.startswith("KEY") or line.startswith("UNIQUE KEY")):
indexName = self.getMysqlColName(line)
indexCols = self.getIndexCols(line)
indexCols = indexCols.replace("`", '')
key = outputDDL.keysTemplate.replace("#{indexName}", indexName)
key = key.replace("#{tableName}", self.tableName).replace("#{indexCols}", indexCols)
if (line.startswith("UNIQUE KEY")):
key = key.replace("INDEX", "UNIQUE INDEX")
self.keys += key
# 数据表comment
if line.find("InnoDB") > 0:
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='测试订单表';
tableCommentBegin = line.find("COMMENT='") + len("COMMENT='")
tableCommentEnd = line.find("';")
tableComentTemp = line[tableCommentBegin: tableCommentEnd]
self.tableComment = outputDDL.tableCommentTemplate.replace("#{tableName}", self.tableName) \
.replace("#{tableComment}", tableComentTemp)
line = streams.readline()
outputDDL.writeDDL(self)
streams.close()
def getMysqlTableName(self, line):
"""获取字段名称"""
begin = line.find('`')
end = line.find('` ')
if (begin >= 0 and end > 0):
colType = line[begin + 1:end]
return colType
else:
return ''
def getMysqlColName(self, line):
"""获取字段名称"""
begin = line.find('`')
end = line.find('` ')
if (begin >= 0 and end > 0):
colType = line[begin + 1:end]
return colType
else:
return ''
def getMysqlType(self, line):
"""获取字段类型"""
begin = line.rfind('` ')
end = line.find('(')
if (begin >= 0 and end > 0):
colType = line[begin + 2:end]
return colType
else:
if begin >= 0:
subStr = line[begin + 2: -1]
return subStr[0: subStr.find(" ")]
return ''
def getIndexCols(self, line):
"""获取索引的字段"""
begin = line.find("(")
end = line.find(")")
return line[begin: end + 1]
def getPrimeKeyCol(self, line):
"""获取主键索引"""
begin = line.find("(")
end = line.find(")")
return line[begin: end + 1]
colType.py
# MySQL 数据表迁移到Postgre数据库
# 迁移匹配规则:
# int -> int4 (int4)
# bigint -> int8
# varchar -> varchar
# tinyint -> int2 (smallint)
# decimal -> decimal
# longtext -> text
# json -> json
# double -> double precision
# float -> real
# AUTO_INCREMENT -> SEQUENCE
class ColTypeClass(object):
""""数据类型转换映射"""
def __init__(self):
self.typeMap = {}
# 数字类型,有符号
self.typeMap['tinyint'] = 'int2'
self.typeMap['smallint'] = 'int2'
self.typeMap['mediumint'] = 'int4'
self.typeMap['int'] = 'int4'
self.typeMap['bigint'] = 'int8'
# 数字类型,无符号
self.typeMap['tinyint unsigned'] = 'int2'
self.typeMap['smallint unsigned'] = 'int2'
self.typeMap['mediumint unsigned'] = 'int4'
self.typeMap['int unsigned'] = 'int4'
self.typeMap['bigint unsigned'] = 'number(20)'
# 浮点类型
self.typeMap['float'] = 'real'
self.typeMap['float unsigned'] = 'real'
self.typeMap['double'] = 'double precision'
self.typeMap['decimal'] = 'decimal'
# boolean
self.typeMap['boolean'] = 'boolean'
# char / varchar
self.typeMap['char'] = 'varchar'
self.typeMap['varchar'] = 'varchar'
# text
self.typeMap['tinytext'] = 'text'
self.typeMap['text'] = 'text'
self.typeMap['mediumtext'] = 'text'
self.typeMap['longtext'] = 'text'
# 二进制
self.typeMap['binary'] = 'bytea'
self.typeMap['varbinary'] = 'bytea'
self.typeMap['tinyblob'] = 'bytea'
self.typeMap['blob'] = 'bytea'
self.typeMap['mediumblog'] = 'bytea'
self.typeMap['longblog'] = 'bytea'
# 时间
self.typeMap['date'] = 'date'
self.typeMap['time'] = 'time'
self.typeMap['datetime'] = 'timestamp'
self.typeMap['timestamp'] = 'timestamp'
# json
self.typeMap['json'] = 'json'
# 获取pgType
def getPgType(self, type):
"""
获取MySQL对应的Postgre数据类型
"""
return self.typeMap.get(type)
write_table_ddl
class OutPutDDL(object):
def __init__(self):
self.tableTemplate = "-- Drop table\n\n-- DROP TABLE edge.#{tableName};\nCREATE TABLE edge.#{tableName} (\n"
self.primeKeyTemplate = " CONSTRAINT #{indexName} PRIMARY KEY #{colName}\n);\n"
self.keysTemplate = "CREATE INDEX #{indexName} ON edge.#{tableName} USING btree #{indexCols};\n"
self.commentTemplate = "COMMENT ON COLUMN edge.#{tableName}.#{colName} IS '#{comment}';\n"
self.tableCommentTemplate = "COMMENT ON TABLE edge.#{tableName} IS '#{tableComment}';\n"
self.permissionsTemplate = "\n-- Permissions\n\nALTER TABLE edge.#{tableName} OWNER TO dev_idp_core;\nGRANT ALL ON TABLE edge.#{tableName} TO dev_idp_core;\n"
def writeDDL(self, template):
ddl = self.tableTemplate.replace("#{tableName}", template.tableName)
ddl += template.cols
ddl += template.primeKey
ddl += template.keys
ddl += template.tableComment
ddl += '\n' + template.comments
permissions = self.permissionsTemplate.replace("#{tableName}", template.tableName)
ddl += permissions
with open(template.outputPath.replace("#{tableName}", template.tableName), "w", encoding="utf-8") as pgStreams:
pgStreams.write(ddl)
pgStreams.close()
测试
MySQL数据表DDL
CREATE TABLE `idp_central_view` (
`id` bigint(20) NOT NULL COMMENT '主键',
`monitor_unit_id` bigint(20) DEFAULT NULL COMMENT '监测单元ID',
`name` varchar(255) DEFAULT NULL COMMENT '中控图名称',
`measure_ids` text COMMENT '测点IDS',
`category_code` varchar(255) DEFAULT NULL COMMENT '中控图分类,具体分类参考IDP_ZKT_CATEGORY字典。',
`release_status` tinyint(1) NOT NULL COMMENT '发布状态,-1未发布,0编辑中,1已发布',
`release_json` longtext COMMENT '已发布的json对象',
`description` varchar(255) DEFAULT NULL COMMENT '说明',
`version` bigint(20) NOT NULL COMMENT '时间戳,版本号。用于同步对比。',
`create_time` bigint(20) unsigned DEFAULT NULL COMMENT '创建时间',
`deleted` tinyint(1) DEFAULT NULL COMMENT '是否删除',
PRIMARY KEY (`id`) USING BTREE,
KEY `idp_central_view_monitor_unit_id_idx` (`monitor_unit_id`) USING BTREE,
KEY `idp_central_view_name_idx` (`name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='组件绘图-中控图表,2020930后中控绘制的新设计方案';
输出postgre数据表DDL
-- Drop table
-- DROP TABLE edge.idp_central_view;
CREATE TABLE edge.idp_central_view (
id int8 NOT NULL, -- 主键
monitor_unit_id int8 NULL, -- 监测单元ID
"name" varchar(255) NULL, -- 中控图名称
measure_ids text NULL, -- 测点IDS
category_code varchar(255) NULL, -- 中控图分类,具体分类参考IDP_ZKT_CATEGORY字典。
release_status int2 NULL, -- 发布状态,-1未发布,0编辑中,1已发布
release_json text NULL, -- 已发布的json对象
description varchar(255) NULL, -- 说明
"version" int8 NOT NULL, -- 时间戳,版本号。用于同步对比。
create_time int8 NULL, -- 创建时间
deleted int2 NULL, -- 是否删除
CONSTRAINT idp_central_view_pk PRIMARY KEY (id)
);
CREATE INDEX idp_central_view_monitor_unit_id_idx ON edge.idp_central_view USING btree (monitor_unit_id);
CREATE INDEX idp_central_view_name_idx ON edge.idp_central_view USING btree (name);
COMMENT ON TABLE edge.idp_central_view IS '组件绘图-中控图表,2020930后中控绘制的新设计方案';
COMMENT ON COLUMN edge.idp_central_view.id IS '主键';
COMMENT ON COLUMN edge.idp_central_view.monitor_unit_id IS '监测单元ID';
COMMENT ON COLUMN edge.idp_central_view."name" IS '中控图名称';
COMMENT ON COLUMN edge.idp_central_view.measure_ids IS '测点IDS';
COMMENT ON COLUMN edge.idp_central_view.category_code IS '中控图分类,具体分类参考IDP_ZKT_CATEGORY字典。';
COMMENT ON COLUMN edge.idp_central_view.release_status IS '发布状态,-1未发布,0编辑中,1已发布';
COMMENT ON COLUMN edge.idp_central_view.release_json IS '已发布的json对象';
COMMENT ON COLUMN edge.idp_central_view.description IS '说明';
COMMENT ON COLUMN edge.idp_central_view."version" IS '时间戳,版本号。用于同步对比。';
COMMENT ON COLUMN edge.idp_central_view.create_time IS '创建时间';
COMMENT ON COLUMN edge.idp_central_view.deleted IS '是否删除';
-- Permissions
ALTER TABLE edge.idp_central_view OWNER TO dev_idp_core;
GRANT ALL ON TABLE edge.idp_central_view TO dev_idp_core;
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/4588.html