flyway从入门到放弃,你学废了吗

前言
flyway快速入门
  • 相关配置说明
  •  依赖
  • 脚本

flyway原理
flyway最佳实践
问题


前言

    

    前不久上线因为flyway脚本导致上线失败,对于flyway,它的执行过程对于不熟悉的人感觉像个黑盒子,所以一直想找个时间看看它的原理。

Flyway是一个开源数据库迁移工具。它强烈支持简单性和约定性,而不是配置。在程序启动的过程中可以自动将脚本同步执行更新到数据库,如果脚本没问题,可以解放dba.
    迁移可以用SQL(支持特定于数据库的语法(例如PL / SQL,T-SQL,…)或Java(用于高级数据转换或处理LOB)编写。官方文档地址:https://flywaydb.org/documentation/



1.flyway 快速入门


1.1 相关配置说明

   在讲原理之前,我们先看看flyway的相关配置,Spring整合后对应的是FlywayProperties,必须配置的用红色已标出,其他经常配置的用绿色标出。具体配置示实际情况。


@ConfigurationProperties(prefix = "spring.flyway")
public class FlywayProperties {

    /**
     * Whether to enable flyway.
     */
    private boolean enabled = true;

    /**
     * Whether to check that migration scripts location exists. Should be set to false
     * when using a wildcard location or a remote-hosted location such as S3 or GCS.
     */
    @Deprecated
    private boolean checkLocation = true;

    /**
     * Locations of migrations scripts. Can contain the special "{vendor}" placeholder to
     * use vendor-specific locations.
     *    迁移脚本的默认地址    classpath:db/migration
     */
    private List<String> locations = new ArrayList<>(Collections.singletonList("classpath:db/migration"));

    /**
     * Encoding of SQL migrations.
     * 默认编码 UTF_8
     */
    private Charset encoding = StandardCharsets.UTF_8;


    /**
     * Name of the schema history table that will be used by Flyway.
     * 默认table名:flyway_schema_history
     */
    private String table = "flyway_schema_history";


    /**
     * Description to tag an existing schema with when applying a baseline.
     * 基准线描述语
     */
    private String baselineDescription = "<< Flyway Baseline >>";

    /**
     * Version to tag an existing schema with when executing baseline.
     * 基准线版本号
     */
    private String baselineVersion = "1";


    /**
     * File name prefix for SQL migrations.
     * 默认sql迁移文件前缀:V
     */
    private String sqlMigrationPrefix = "V";

    /**
     * File name suffix for SQL migrations.
     * 默认sql迁移文件后缀: .sql
     */
    private List<String> sqlMigrationSuffixes = new ArrayList<>(Collections.singleton(".sql"));

    /**
     * File name separator for SQL migrations.
     * 文件名分隔符
     */
    private String sqlMigrationSeparator = "__";

    /**
     * Login user of the database to migrate.
     * 数据库用户名
     */
    private String user;

    /**
     * Login password of the database to migrate.
     * 数据库密码
     */
    private String password;

    /**
     * Fully qualified name of the JDBC driver. Auto-detected based on the URL by default.
     * 驱动名
     */
    private String driverClassName;

    /**
     * JDBC url of the database to migrate. If not set, the primary configured data source is used.
     * 数据库地址
     */
    private String url;

    /**
     * SQL statements to execute to initialize a connection immediately after obtainin it
     * 初始sql
     */
    private List<String> initSqls = new ArrayList<>();

    /**
     * Whether to automatically call baseline when migrating a non-empty schema.
     * 当迁移时发现目标schema非空,而且带有没有元数据的表时,是否自动执行基准迁移,默认false
     */
    private boolean baselineOnMigrate;

    /**
     * Whether to automatically call clean when a validation error occurs.
     * 当发现校验错误时是否自动调用clean,默认false.
     */
    private boolean cleanOnValidationError;

    /**
     * Whether to group all pending migrations together in the same transaction when applying them.
     * 是否把所有迁移脚本放到一个事务
     */
    private boolean group;

    /**
     * Whether to ignore missing migrations when reading the schema history table.
     * 是否忽略classpath下相对于history table表缺失的部分,默认false
     */
    private boolean ignoreMissingMigrations;

    /**
     * Whether to ignore ignored migrations when reading the schema history table.
     * 是否忽略忽略的迁移脚本
     */
    private boolean ignoreIgnoredMigrations;

    /**
     * Whether to ignore future migrations when reading the schema history table.
     * 是否忽略db版本高于classpath版本,默认忽略,会有警告日志打印。
     */
    private boolean ignoreFutureMigrations = true;


    /**
     * Whether to automatically call validate when performing a migration.
     * 迁移时是否验证
     */
    private boolean validateOnMigrate = true;



    /**
     * File to which the SQL statements of a migration dry run should be output. Requires Flyway Teams
     * 是否试运行脚本
     */
    private File dryRunOutput;

   


flyway从入门到放弃,你学废了吗

1.2 引入依赖

 <dependency>
            <groupId>org.flywaydb</groupId>
            <artifactId>flyway-core</artifactId>
            <version>xxx</version>
 </dependency>


1.3 添加脚本

     flyway 配置中默认的脚本路径是在 classpath:db/migration,当然也可以通过配置是改变的。

如:

flyway从入门到放弃,你学废了吗


       注意文件名格式:‘V’ + 版本号 + ‘_’ + des + ‘后缀’ ,这些都可以根据上面的配置自己去配置

flyway 在启动的时候会自动创建一张名称为 flyway_schema_history 的元数据表,目前在我的项目中存在多个项目连接同一个数据库,导致冲突影响,启动报错。这种情况,建议每个项目都有一个自己的元数据表。指定 spring.flyway.table 的值即可。如flyway_schema_history_{项目名称},这样基本可以做到不会发生冲突了。

    项目启动成功后,会在数据库中生成flyway_schema_history 表。并插入迁移记录。每次迁移都会根据表中的数据来做数据对比验证。

flyway从入门到放弃,你学废了吗


接下来开始讲解原理。知其然也要知其所以然。


2.flyway 原理

2.1 flyway 相关核心对象创建

    大家如果有看过spring.boot.autoconfigure,你会发现市面上比较流行的框架,spring都做了自动化配置。通过看spring的封装,可以更好的帮助我们对其他框架自身的理解。

flyway从入门到放弃,你学废了吗


直接跳到FlywayAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Flyway.class)
@Conditional(FlywayDataSourceCondition.class)
@ConditionalOnProperty(prefix = "spring.flyway", name = "enabled", matchIfMissing = true)
@AutoConfigureAfter({ DataSourceAutoConfiguration.class, JdbcTemplateAutoConfiguration.class,
        HibernateJpaAutoConfiguration.class })
@Import(DatabaseInitializationDependencyConfigurer.class)
public class FlywayAutoConfiguration {


从配置中也可以看出 flyway的配置是在DataSourceAutoConfiguration之后,而DataSourceAutoConfiguration默认引入的DataSource是HikariDataSource,这也是spring-boot-starter-data-jpa默认使用的DataSource

 

flyway从入门到放弃,你学废了吗


之所以是它,是因为其他的Datasource 是按ConditionalOnClass 来条件装配的,而spring-boot-starter-data-jpa 默认引入了hikari


flyway从入门到放弃,你学废了吗

2.1.1 flyway 创建

在FlywayAutoConfiguration的类中有一个静态内部类FlywayConfiguration


    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(JdbcUtils.class)
    @ConditionalOnMissingBean(Flyway.class)
    @EnableConfigurationProperties(FlywayProperties.class)
    public static class FlywayConfiguration {

        @Bean
        public Flyway flyway(FlywayProperties properties, ResourceLoader resourceLoader,
                ObjectProvider<DataSource> dataSource, @FlywayDataSource ObjectProvider<DataSource> flywayDataSource,
                ObjectProvider<FlywayConfigurationCustomizer> fluentConfigurationCustomizers,
                ObjectProvider<JavaMigration> javaMigrations, ObjectProvider<Callback> callbacks) {
            FluentConfiguration configuration = new FluentConfiguration(resourceLoader.getClassLoader());
            configureDataSource(configuration, properties, flywayDataSource.getIfAvailable(), dataSource.getIfUnique());
            checkLocationExists(configuration.getDataSource(), properties, resourceLoader);
            configureProperties(configuration, properties);
            List<Callback> orderedCallbacks = callbacks.orderedStream().collect(Collectors.toList());
            configureCallbacks(configuration, orderedCallbacks);
            fluentConfigurationCustomizers.orderedStream().forEach((customizer) -> customizer.customize(configuration));
            configureFlywayCallbacks(configuration, orderedCallbacks);
            List<JavaMigration> migrations = javaMigrations.stream().collect(Collectors.toList());
            configureJavaMigrations(configuration, migrations);
            return configuration.load();
        }

   

 通过看spring封装的源码,我们可以学到很多封装技巧,比如属性注入ObjectProvider,(Spring 4.3版本中引入的)
如果待注入参数的Bean为空或有多个时,便是ObjectProvider发挥作用的时候了。如果注入实例为空时,使用ObjectProvider则避免了强依赖导致的依赖对象不存在异常;

如果有多个实例,ObjectProvider的方法会根据Bean实现的Ordered接口或@Order注解指定的先后顺序获取一个Bean。从而了提供了一个更加宽松的依赖注入方式。

还可以看到FlywayConfigurationCustomizer,基本上都提供了类似命名的自定义扩展点,如kafka的DefaultKafkaProducerFactoryCustomizer。


回到正文,该bean配置主要是将spring的FlywayProperties配置转化成flyway原生的FluentConfiguration,然后checkLocationExists检查配置文件是否存在,再做一些自定义的配置,最后创建flyway实例。

说到底flyway也是一个配置对象,获取配置的一个入口。

flyway从入门到放弃,你学废了吗


2.1.2 FlywayMigrationInitializer 创建

FlywayConfiguration中还有一个Bean,这个bean就是核心了。flyway启动报错堆栈基本都和这个类有关


    @Bean
    @ConditionalOnMissingBean
    public FlywayMigrationInitializer flywayInitializer(Flyway flyway,
            ObjectProvider<FlywayMigrationStrategy> migrationStrategy) {
        return new FlywayMigrationInitializer(flyway, migrationStrategy.getIfAvailable());
    }


注意看FlywayMigrationInitializer


public class FlywayMigrationInitializer implements InitializingBean, Ordered {


实现了InitializingBean ,在spring对象的生命周期里,当初始化对象后,会调用InitializingBean的afterPropertiesSet方法。


    @Override
    public void afterPropertiesSet() throws Exception {
        if (this.migrationStrategy != null) {
            this.migrationStrategy.migrate(this.flyway);
        }
        else {
            try {
                this.flyway.migrate();
            }
            catch (NoSuchMethodError ex) {
                // Flyway < 7.0
                this.flyway.getClass().getMethod("migrate").invoke(this.flyway);
            }
        }
    }


这样核心对象Flyway 和 FlywayMigrationInitializer 就创建好了。

2.2 FlywayMigrationInitializer 的初始化过程。

        接着上面的将,调用afterPropertiesSet方法,其中migrationStrategy没有配置的话默认为空,则走flyway默认的迁移逻辑flyway.migrate()

继续跟进


public class Flyway implements Configuration {

 public int migrate() throws FlywayException {
        return execute(new Command<Integer>() {
            public Integer execute(MigrationResolver migrationResolver,
                                   SchemaHistory schemaHistory, Database database, Schema[] schemas, CallbackExecutor callbackExecutor



            ) {
                if (configuration.isValidateOnMigrate()) {
                    doValidate(database, migrationResolver, schemaHistory, schemas, callbackExecutor,
                            true // Always ignore pending migrations when validating before migrating
                    );
                }

                new DbSchemas(database, schemas, schemaHistory).create();

                if (!schemaHistory.exists()) {
                    List<Schema> nonEmptySchemas = new ArrayList<>();
                    for (Schema schema : schemas) {
                        if (!schema.empty()) {
                            nonEmptySchemas.add(schema);
                        }
                    }

                    if (!nonEmptySchemas.isEmpty()) {
                        if (configuration.isBaselineOnMigrate()) {
                            doBaseline(schemaHistory, database, schemas, callbackExecutor);
                        } else {
                            // Second check for MySQL which is sometimes flaky otherwise
                            if (!schemaHistory.exists()) {
                                throw new FlywayException("Found non-empty schema(s) "
                                        + StringUtils.collectionToCommaDelimitedString(nonEmptySchemas)
                                        + " without schema history table! Use baseline()"
                                        + " or set baselineOnMigrate to true to initialize the schema history table.");
                            }
                        }
                    }
                }

                return new DbMigrate(database, schemaHistory, schemas[0], migrationResolver, configuration,
                        callbackExecutor).migrate();
            }
        }, true
);
    }




}



代码看着有点复杂,很明显蓝色部分是回调逻辑,该源码里有很多这种写法。先看excute方法,等会再来看回调逻辑。


 <T> T execute(Command<T> command, boolean scannerRequired) {
        T result;
        if (configuration.getDataSource() == null) {
            throw new FlywayException("Unable to connect to the database. Configure the url, user and password!");
        }
        Database database = null;
        try {
            //根据数据库类型创建dataBase
            database = DatabaseFactory.createDatabase(configuration, !dbConnectionInfoPrinted);
            dbConnectionInfoPrinted = true;
            LOG.debug("DDL Transactions Supported: " + database.supportsDdlTransactions());
            //获取schema信息
            Schema[] schemas = prepareSchemas(database);
            ResourceProvider resourceProvider;
            ClassProvider classProvider;
            if (!scannerRequired && configuration.isSkipDefaultResolvers() && configuration.isSkipDefaultCallbacks()) {
                resourceProvider = NoopResourceProvider.INSTANCE;
                classProvider = NoopClassProvider.INSTANCE;
            } else {
                //扫描配置
                Scanner scanner = new Scanner(
                        Arrays.asList(configuration.getLocations()),
                        configuration.getClassLoader(),
                        configuration.getEncoding());
                resourceProvider = scanner;
                classProvider = scanner;
            }
            //创建语句工厂
            SqlStatementBuilderFactory sqlStatementBuilderFactory = database.createSqlStatementBuilderFactory();
            
            CallbackExecutor callbackExecutor = new DefaultCallbackExecutor(configuration, database, schemas[0],
                    prepareCallbacks(database, resourceProvider, sqlStatementBuilderFactory));

            //回调      
            result = command.execute(
                    //创建解析器
                    createMigrationResolver(database, resourceProvider, classProvider, sqlStatementBuilderFactory),
                    //从db中获取历史迁移数据
                    SchemaHistoryFactory.getSchemaHistory(configuration, database, schemas[0]),
                    database,
                    schemas,
                    callbackExecutor
            );
        } finally {
            if (database != null) {
                database.close();
            }
            showMemoryUsage();
        }
        return result;
    }



   flyway因为适配了很多数据库,所以源码做了很多抽象设计。
获取dataBase.


 public static Database createDatabase(Configuration configuration, boolean printInfo) {
        Connection connection = JdbcUtils.openConnection(configuration.getDataSource(), configuration.getConnectRetries());
        boolean originalAutoCommit;
        try {
            originalAutoCommit = connection.getAutoCommit();
            if (!originalAutoCommit) {
                connection.setAutoCommit(true);
            }
        } catch (SQLException e) {
            throw new FlywaySqlException("Unable to turn on auto-commit for the connection", e);
        }
        
        DatabaseMetaData databaseMetaData = JdbcUtils.getDatabaseMetaData(connection);
        String databaseProductName = JdbcUtils.getDatabaseProductName(databaseMetaData);

        DatabaseType databaseType = DatabaseType.fromJdbcConnection(connection);

        Database database = createDatabase(databaseType, configuration, connection, originalAutoCommit);
        database.ensureSupported();
        if (!database.supportsChangingCurrentSchema() && configuration.getSchemas().length > 0) {
            LOG.warn(databaseProductName + " does not support setting the schema for the current session. " +
                    "Default schema will NOT be changed to " + configuration.getSchemas()[0] + " !");
        }
        return database;
    }


首先是获取连接对象,这个就是jdbc的原理,第一步加载驱动,第二步创建连接对象。跟到SimpleDriverDataSource,这个是spring-jdbc下的类。

 public class SimpleDriverDataSource extends AbstractDriverBasedDataSource {

    @Override
    protected Connection getConnectionFromDriver(Properties props) throws SQLException {
        Driver driver = getDriver();
        String url = getUrl();
        Assert.notNull(driver, "Driver must not be null");
        if (logger.isDebugEnabled()) {
            logger.debug("Creating new JDBC Driver Connection to [" + url + "]");
        }
        return driver.connect(url, props);
    }

}

到这里就很明显了。


接着createDatabase 创建Database,


private static Database createDatabase(DatabaseType databaseType, Configuration configuration,
                                           Connection connection, boolean originalAutoCommit) {
        switch (databaseType) {
            case COCKROACHDB:
                return new CockroachDBDatabase(configuration, connection, originalAutoCommit);
            case DB2:
                return new DB2Database(configuration, connection, originalAutoCommit);
            case DERBY:
                return new DerbyDatabase(configuration, connection, originalAutoCommit);
            case H2:
                return new H2Database(configuration, connection, originalAutoCommit);
            case HSQLDB:
                return new HSQLDBDatabase(configuration, connection, originalAutoCommit);
            case INFORMIX:
                return new InformixDatabase(configuration, connection, originalAutoCommit);
            case MYSQL:
                return new MySQLDatabase(configuration, connection, originalAutoCommit);
            case ORACLE:
                return new OracleDatabase(configuration, connection, originalAutoCommit);
            case POSTGRESQL:
                return new PostgreSQLDatabase(configuration, connection, originalAutoCommit);
            case REDSHIFT:
                return new RedshiftDatabase(configuration, connection, originalAutoCommit);
            case SQLITE:
                return new SQLiteDatabase(configuration, connection, originalAutoCommit);
            case SAPHANA:
                return new SAPHANADatabase(configuration, connection, originalAutoCommit);
            case SQLSERVER:
                return new SQLServerDatabase(configuration, connection, originalAutoCommit);
            case SYBASEASE_JCONNECT:
            case SYBASEASE_JTDS:
                return new SybaseASEDatabase(configuration, connection, originalAutoCommit);
            default:
                throw new FlywayException("Unsupported Database: " + databaseType.name());
        }
    }


这是目前flyway所支持的所有数据库类型


❷prepareSchemas,获取schemas信息

    private Schema[] prepareSchemas(Database database) {
        String[] schemaNames = configuration.getSchemas();
        if (schemaNames.length == 0) {
            Schema currentSchema = database.getMainConnection().getCurrentSchema();
            if (currentSchema == null) {
                throw new FlywayException("Unable to determine schema for the schema history table." +
                        " Set a default schema for the connection or specify one using the schemas property!");
            }
            schemaNames = new String[]{currentSchema.getName()};
        }

        if (schemaNames.length == 1) {
            LOG.debug("Schema: " + schemaNames[0]);
        } else {
            LOG.debug("Schemas: " + StringUtils.arrayToCommaDelimitedString(schemaNames));
        }

        Schema[] schemas = new Schema[schemaNames.length];
        for (int i = 0; i < schemaNames.length; i++) {
            schemas[i] = database.getMainConnection().getSchema(schemaNames[i]);
        }
        return schemas;
    }


如果没有配置schema,比如mysql,则会去数据库中获取schema,


public abstract class Connection<D extends Database> implements Closeable {

    protected Schema doGetCurrentSchema() throws SQLException {
        return getSchema(getCurrentSchemaNameOrSearchPath());
    }
    
    public abstract Schema getSchema(String name);
    

    protected abstract String getCurrentSchemaNameOrSearchPath() throws SQLException;
    
}    


其中最主要的方法都是抽象方法,不同数据库自己去实现。以mysql为例


public class MySQLConnection extends Connection<MySQLDatabase> {

    @Override
    protected String getCurrentSchemaNameOrSearchPath() throws SQLException {
        return jdbcTemplate.queryForString("SELECT DATABASE()");
    }

    @Override
    public Schema getSchema(String name) {
        return new MySQLSchema(jdbcTemplate, database, name);
    }

}


最终底层都是调用封装jdbc的jdbcTemplate的查询方法


❸new Scanner 创建scanner,并扫描配置


  public Scanner(Collection<Location> locations, ClassLoader classLoader, Charset encoding) {
        FileSystemScanner fileSystemScanner = new FileSystemScanner(encoding);
        boolean android = new FeatureDetector(classLoader).isAndroidAvailable();
        for (Location location : locations) {
            if (location.isFileSystem()) {
                resources.addAll(fileSystemScanner.scanForResources(location));
            } else {
                ResourceAndClassScanner resourceAndClassScanner = android
                        ? new AndroidScanner(classLoader, encoding, location)
                        : new ClassPathScanner(classLoader, encoding, location);
                resources.addAll(resourceAndClassScanner.scanForResources());
                classes.addAll(resourceAndClassScanner.scanForClasses());
            }
        }
    }


从代码可以看出,flyway脚本既支持脚本的配置,也支持通过java代码来做迁移。一般都是脚本的方式。最后会将所有扫描到的脚本文件暂存起来。


flyway从入门到放弃,你学废了吗


❺createMigrationResolver,创建迁移解析器。

  private MigrationResolver createMigrationResolver(Database database,
                                                      ResourceProvider resourceProvider,
                                                      ClassProvider classProvider,
                                                      SqlStatementBuilderFactory sqlStatementBuilderFactory) {
        //从配置中获取解析器,默认没有配置                                              
        for (MigrationResolver resolver : configuration.getResolvers()) {
            ConfigUtils.injectFlywayConfiguration(resolver, configuration);
        }
        //组合模式
        return new CompositeMigrationResolver(database,
                resourceProvider, classProvider, configuration,
                sqlStatementBuilderFactory , configuration.getResolvers());
    }


这里有用到组合模式来创建解析器

    public CompositeMigrationResolver(Database database,
                                      ResourceProvider resourceProvider,
                                      ClassProvider classProvider,
                                      Configuration configuration,
                                      SqlStatementBuilderFactory sqlStatementBuilderFactory,
                                      MigrationResolver... customMigrationResolvers
    ) {
        if (!configuration.isSkipDefaultResolvers()) {
            migrationResolvers.add(new SqlMigrationResolver(database, resourceProvider, sqlStatementBuilderFactory, configuration));
            migrationResolvers.add(new JavaMigrationResolver(classProvider, configuration));
            migrationResolvers.add(new JdbcMigrationResolver(classProvider, configuration));
            if (new FeatureDetector(configuration.getClassLoader()).isSpringJdbcAvailable()) {
                migrationResolvers.add(new SpringJdbcMigrationResolver(classProvider, configuration));
            }
        }
        migrationResolvers.addAll(Arrays.asList(customMigrationResolvers));
    }

可以看到这里面有四个解析器,但是一般情况,我们都是用的sql脚本,所以只有SqlMigrationResolver最终才能拿到数据。这里也是一个很好的组合模式模板

flyway从入门到放弃,你学废了吗


如何判断哪些脚本需要做迁移呢,因为迁移的记录在数据库里,这里先创建好对象为后面做准备


❻getSchemaHistory


    public static SchemaHistory getSchemaHistory(Configuration configuration, Database database, Schema schema) {
        //获取操作用户
        String installedBy = configuration.getInstalledBy() == null
                ? database.getCurrentUser()
                : configuration.getInstalledBy();
        //获取表        
        Table table = schema.getTable(configuration.getTable());
        JdbcTableSchemaHistory jdbcTableSchemaHistory = new JdbcTableSchemaHistory(database, table, installedBy);
        return jdbcTableSchemaHistory;
    }


这里的table和schema也是抽象类,不同的数据库有不同的实现,如判断table是否存在,加锁等,最终也是通过jdbc执行对应的sql.其中table名称是默认配置中的名称 flyway_schema_history


table类


/**
 * Represents a database table within a schema.
 */
public abstract class Table extends SchemaObject {
    private static final Log LOG = LogFactory.getLog(Table.class);

    /**
     * Creates a new table.
     */
    public Table(JdbcTemplate jdbcTemplate, Database database, Schema schema, String name) {
        super(jdbcTemplate, database, schema, name);
    }

    /**
     * Checks whether this table exists.
     *
     * @return {@code true} if it does, {@code false} if not.
     */
    public boolean exists() {
        try {
            return doExists();
        } catch (SQLException e) {
            throw new FlywaySqlException("Unable to check whether table " + this + " exists", e);
        }
    }

    /**
     * Checks whether this table exists.
     */
    protected abstract boolean doExists() throws SQLException;

    /**
     * Checks whether the database contains a table matching these criteria.
     */
    protected boolean exists(Schema catalog, Schema schema, String table, String... tableTypes) throws SQLException {
        String[] types = tableTypes;
        if (types.length == 0) {
            types = null;
        }

        ResultSet resultSet = null;
        boolean found;
        try {
            resultSet = database.jdbcMetaData.getTables(
                    catalog == null ? null : catalog.getName(),
                    schema == null ? null : schema.getName(),
                    table,
                    types);
            found = resultSet.next();
        } finally {
            JdbcUtils.closeResultSet(resultSet);
        }

        return found;
    }

    /**
     * Locks this table in this schema using a read/write pessimistic lock until the end of the current transaction.
     */
    public void lock() {
        try {
            LOG.debug("Locking table " + this + "...");
            doLock();
            LOG.debug("Lock acquired for table " + this);
        } catch (SQLException e) {
            throw new FlywaySqlException("Unable to lock table " + this, e);
        }
    }

    /**
     * Locks this table in this schema using a read/write pessimistic lock until the end of the current transaction.
     *
     */
    protected abstract void doLock() throws SQLException;
}


以mysql为例,其中加锁操作就是我们所说的悲观锁。

    @Override
    public Table getTable(String tableName) {
        return new MySQLTable(jdbcTemplate, database, this, tableName);
    }

    #MySQLTable.class
   @Override
    protected void doLock() throws SQLException {
        jdbcTemplate.execute("SELECT * FROM " + this + " FOR UPDATE");
    }

前面这些步骤都是为后面校验迁移做准备的,开始回调真正的操作


flyway从入门到放弃,你学废了吗



❼command.execute

蓝色标记的部分即为回调


public class Flyway implements Configuration {

 public int migrate() throws FlywayException {
        return execute(new Command<Integer>() {
            public Integer execute(MigrationResolver migrationResolver,
                                   SchemaHistory schemaHistory, Database database, Schema[] schemas, CallbackExecutor callbackExecutor) {
            
                // validateOnMigrate默认为true,即迁移前总是校验
                if (configuration.isValidateOnMigrate()) {
                    //
验证流程
                    doValidate(database, migrationResolver, schemaHistory, schemas, callbackExecutor,
                            true // Always ignore pending migrations when validating before migrating
                    );
                }
              
 //如果指定的schemas不存在,则创建
                new DbSchemas(database, schemas, schemaHistory).create();

                if (!schemaHistory.exists()) {
                    List<Schema> nonEmptySchemas = new ArrayList<>();
                    for (Schema schema : schemas) {
                        if (!schema.empty()) {
                            nonEmptySchemas.add(schema);
                        }
                    }

                    if (!nonEmptySchemas.isEmpty()) {
                        if (configuration.isBaselineOnMigrate()) {
                            //baseline用途
                            doBaseline(schemaHistory, database, schemas, callbackExecutor);
                        } else {
                            // Second check for MySQL which is sometimes flaky otherwise
                            if (!schemaHistory.exists()) {
                                throw new FlywayException("Found non-empty schema(s) "
                                        + StringUtils.collectionToCommaDelimitedString(nonEmptySchemas)
                                        + " without schema history table! Use baseline()"
                                        + " or set baselineOnMigrate to true to initialize the schema history table.");
                            }
                        }
                    }
                }
                //
选中了第一个schemas进行迁移
                return new DbMigrate(database, schemaHistory, schemas[0], migrationResolver, configuration,
                        callbackExecutor).migrate();
            }
        }, true)
;
    }
}

2.2 .1 验证流程

❶doValidate,默认配置validateOnMigrate是true


   //flyway.class
   private void doValidate(Database database, MigrationResolver migrationResolver, SchemaHistory schemaHistory,
                            Schema[] schemas, CallbackExecutor callbackExecutor, boolean ignorePending) {
        String validationError =
                new DbValidate(database, schemaHistory, schemas[0], migrationResolver,
                        configuration, ignorePending, callbackExecutor).validate()
;

        if (validationError != null) {
            //一般生产环境严禁配置这里生效,会造成生产数据丢失
            if (configuration.isCleanOnValidationError()) {
                doClean(database, schemaHistory, schemas, callbackExecutor);
            } else {
                throw new FlywayException("Validate failed: " + validationError);
            }
        }
    }


可以看到创建对象DbValidate来做验证。


   //flyway.class
   private void doValidate(Database database, MigrationResolver migrationResolver, SchemaHistory schemaHistory,
                            Schema[] schemas, CallbackExecutor callbackExecutor, boolean ignorePending) {
        String validationError =
                new DbValidate(database, schemaHistory, schemas[0], migrationResolver,
                        configuration, ignorePending, callbackExecutor).validate()
;

        if (validationError != null) {
            //一般生产环境严禁配置这里生效,会造成生产数据丢失
            if (configuration.isCleanOnValidationError()) {
                doClean(database, schemaHistory, schemas, callbackExecutor);
            } else {
                throw new FlywayException("Validate failed: " + validationError);
            }
        }
    }


继续往下跟进


public String validate() {
        //如果schema不存在,将不会进行具体的验证逻辑,会抛错
        if (!schema.exists()) {
            if (!migrationResolver.resolveMigrations(new Context() {
                @Override
                public Configuration getConfiguration() {
                    return configuration;
                }
            }).isEmpty() && !pending) {
                return "Schema " + schema + " doesn't exist yet";
            }
            return null;
        }

        callbackExecutor.onEvent(Event.BEFORE_VALIDATE);

        LOG.debug("Validating migrations ...");
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        Pair<Integer, String> result = new TransactionTemplate(connection.getJdbcConnection()).execute(new Callable<Pair<Integer, String>>() {
            @Override
            public Pair<Integer, String> call() {
                MigrationInfoServiceImpl migrationInfoService =
                        new MigrationInfoServiceImpl(migrationResolver, schemaHistory, configuration,
                                configuration.getTarget(),
                                configuration.isOutOfOrder(),
                                pending,
                                configuration.isIgnoreMissingMigrations(),
                                configuration.isIgnoreIgnoredMigrations(),
                                configuration.isIgnoreFutureMigrations());

                // refresh方法刷新了(重新加载了)db和classpath下的迁移信息。
                //db中对应的是已经应用的迁移,classpath下是全量的,
                //由双方对比,得出已应用的最新版本,记录在(context.lastApplied)                
               
migrationInfoService.refresh();

                int count = migrationInfoService.all().length;
                // 内部循环验证各个脚本情况,当出现错误时立即返回,
                //否则一直遍历到最后,返回null。
                String validationError =
migrationInfoService.validate();
                return Pair.of(count, validationError);
            }
        });

        stopWatch.stop();

        String error = result.getRight();
        if (error == null) {
            int count = result.getLeft();
            if (count == 1) {
                LOG.info(String.format("Successfully validated 1 migration (execution time %s)",
                        TimeFormat.format(stopWatch.getTotalTimeMillis())));
            } else {
                LOG.info(String.format("Successfully validated %d migrations (execution time %s)",
                        count, TimeFormat.format(stopWatch.getTotalTimeMillis())));
            }
            callbackExecutor.onEvent(Event.AFTER_VALIDATE);
        } else {
            callbackExecutor.onEvent(Event.AFTER_VALIDATE_ERROR);
        }


        return error;
    }


其中TransactionTemplate,从名字上可以看出是开启事务。

 //TransactionTemplate.class
 public <T> T execute(Callable<T> transactionCallback) {
        boolean oldAutocommit = true;
        try {
            oldAutocommit = connection.getAutoCommit();
            connection.setAutoCommit(false);
            T result = transactionCallback.call();
            connection.commit();
            return result;

        } catch (Exception e) {
            RuntimeException rethrow;
            if (e instanceof SQLException) {
                rethrow = new FlywaySqlException("Unable to commit transaction", (SQLException) e);
            } else if (e instanceof RuntimeException) {
                rethrow = (RuntimeException) e;
            } else {
                rethrow = new FlywayException(e);
            }

            if (rollbackOnException) {
                try {
                    LOG.debug("Rolling back transaction...");
                    connection.rollback();
                    LOG.debug("Transaction rolled back");
                } catch (SQLException se) {
                    LOG.error("Unable to rollback transaction", se);
                }
            } else {
                try {
                    connection.commit();
                } catch (SQLException se) {
                    LOG.error("Unable to commit transaction", se);
                }
            }
            throw rethrow;
        } finally {
            try {
                connection.setAutoCommit(oldAutocommit);
            } catch (SQLException e) {
                LOG.error("Unable to restore autocommit to original value for connection", e);
            }
        }
    }
}


这就是很典型的事务操作回滚机制,call的事务方法就migrationInfoService的refresh()方法。其中创建migrationInfoService时,传入了几个参数,也是比较容易搞混淆的参数。


    /**
     * Whether to ignore missing migrations when reading the schema history table.
     * 是否忽略classpath下相对于history table表缺失的部分,默认false
     */
    private boolean ignoreMissingMigrations;

    /**
     * Whether to ignore ignored migrations when reading the schema history table.
     * 是否忽略忽略的迁移脚本
     */
    private boolean ignoreIgnoredMigrations;

    /**
     * Whether to ignore future migrations when reading the schema history table.
     * 是否忽略db版本高于classpath版本,默认忽略,会有警告日志打印。
     */
    private boolean ignoreFutureMigrations = true;


其中配置的比较多的就是ignoreMissingMigrations,在开发阶段多版本并行的时候,可能就存在每个分支对应的脚本不一样。所以会把它设置为true


接下来看refresh操作

    

public void refresh() {
    //从classpath解析获得的迁移信息。包含版本号、描述、校验和等信息,并根据版本排了序。V排在前,R排在后
    Collection<ResolvedMigration> resolvedMigrations = migrationResolver.resolveMigrations(context);
    // db中已经应用的迁移信息,按installed_rank升序排序。AppliedMigration可以认为就是表实体
    List<AppliedMigration> appliedMigrations = schemaHistory.allAppliedMigrations();

    
    //将配置中的内容,放到了上下文中。
    MigrationInfoContext context = new MigrationInfoContext();
    context.outOfOrder = outOfOrder;
    context.pending = pending;
    context.missing = missing;
    context.ignored = ignored;
    context.future = future;
    context.target = target;

    // resolvedVersioned存储了所有的类路径下的V类型迁移,并按版本号排序
    Map<Pair<MigrationVersion, Boolean>, ResolvedMigration> resolvedVersioned =new TreeMap<>();
    
    for (ResolvedMigration resolvedMigration : resolvedMigrations) {
        MigrationVersion version = resolvedMigration.getVersion();
        // 当version为null说明是R类型。否则是V类型,这里维护了类路径下最新的版本。
        if (version != null) {
            if (version.compareTo(context.lastResolved) > 0) {
                // 维护类路径下最新的版本
                context.lastResolved = version;
            }
            // Pair的right代表了是否是undo,后面resolvedVersioned调用get时会用到
            resolvedVersioned.put(Pair.of(version,false), resolvedMigration);
        } else {
            // 存储R类型迁移
            resolvedRepeatable.put(resolvedMigration.getDescription(), resolvedMigration);
        }
    }

    // appliedVersioned存放了所有DB应用的V类型的迁移。pair的left是db中的迁移信息,right是outOfOrder属性值,默认是false,
    //但是如果低于了DB中应用的最新版本,就变成了true

    List<Pair<AppliedMigration, AppliedMigrationAttributes>> appliedVersioned = new ArrayList<>();
    
    // 由于appliedMigrations是按照installed_rank升序排序,所以遍历顺序可以认为是已经迁移的顺序
    for (AppliedMigration appliedMigration : appliedMigrations) {
        MigrationVersion version = appliedMigration.getVersion();
        if (version == null) {
            appliedRepeatable.add(appliedMigration);
            continue;
        }
        if (appliedMigration.getType() == MigrationType.SCHEMA) {
            context.schema = version;
        }
        if (appliedMigration.getType() == MigrationType.BASELINE) {
            context.baseline = version;
        }

        // DB中的信息,默认outOfOrder赋值为false
        appliedVersioned.add(Pair.of(appliedMigration, new AppliedMigrationAttributes()));
    }

    // 经过循环得到db最新应用的版本context.lastApplied
    for (Pair<AppliedMigration, AppliedMigrationAttributes> av : appliedVersioned) {
        MigrationVersion version = av.getLeft().getVersion();
        if (version != null) {
            // 维护DB中最新版本
            if (version.compareTo(context.lastApplied) > 0) {
                context.lastApplied = version;
            } else {
                // 如果低于最新版本,置为true。说明是个插队的
                av.getRight().outOfOrder = true;
            }
        }
    }

    // 如果没有指定target,那么就用最新已迁移的
    if (MigrationVersion.CURRENT == target) {
        context.target = context.lastApplied;
    }

    // 这里可以认为是所有需要迁移信息,包括了undo类型的。
    List<MigrationInfoImpl> migrationInfos1 = new ArrayList<>();
    // pendingResolvedVersioned可以认为是后续将要进行迁移的内容。是由resolvedVersioned和appliedVersioned的差集得来,去除了undo类型的迁移。
    // resolvedVersioned.values()中都是非undo类型的迁移
    Set<ResolvedMigration> pendingResolvedVersioned = new HashSet<>(resolvedVersioned.values());
    for (Pair<AppliedMigration, AppliedMigrationAttributes> av : appliedVersioned) {
        // 找到指定版本号的迁移,要求是非undo类型
        ResolvedMigration resolvedMigration = resolvedVersioned.get(Pair.of(av.getLeft().getVersion(), av.getLeft().getType().isUndo()));
        if (resolvedMigration != null) {
            // 这里认为是在做差集。resolvedVersioned - appliedVersioned,即是需要追加的迁移
            pendingResolvedVersioned.remove(resolvedMigration);
        }
        // 将db中的迁移加入迁移信息中
        // resolvedMigration可能为null,可能是因为是undo类型,也可能db中有,但类路径下没有
        migrationInfos1.add(new MigrationInfoImpl(resolvedMigration, av.getLeft(), context, av.getRight().outOfOrder));
    }

    // 将db中没有的,需要额外迁移的内容加进来
    // outOfOrder指定为false,因为这些是后续要追加进来的,按照顺序去迁移就好了
    for (ResolvedMigration prv : pendingResolvedVersioned) {
        migrationInfos1.add(new MigrationInfoImpl(prv, null, context, false));
    }


    // pendingResolvedRepeatable存放需要应用的R类型迁移,存在于类路径中
    //实际上pending的都是类路径上的,只是和db作对比之后,类路径上的过滤掉了一批
    Set<ResolvedMigration> pendingResolvedRepeatable = new HashSet<>(resolvedRepeatable.values());

    for (ResolvedMigration prr : pendingResolvedRepeatable) {
        // 增加pending的
        migrationInfos1.add(new MigrationInfoImpl(prr, null, context, false));
    }

    Collections.sort(migrationInfos1);
    migrationInfos = migrationInfos1;
}


这步比较复杂,我们只考虑V操作,不考虑P(可以重复的脚本)操作,代码已过滤。

先看migrationResolver.resolveMigrations(context),解析类路径下的脚本。前面说了这里的SqlMigrationResolver是组合方式,包含了4种解析方式。只看SqlMigrationResolverV版本的,默认用的是这两种。


 //SqlMigrationResolver.class
 public List<ResolvedMigration> resolveMigrations(Context context) {
        List<ResolvedMigration> migrations = new ArrayList<>();
        //分割符 __
        String separator = configuration.getSqlMigrationSeparator();
        //后缀 .sql
        String[] suffixes = configuration.getSqlMigrationSuffixes();
        //前缀V
        addMigrations(migrations, configuration.getSqlMigrationPrefix(), separator, suffixes,false);
        Collections.sort(migrations, new ResolvedMigrationComparator());
        return migrations;
    }


继续跟进addMigrations方法


   private void addMigrations(List<ResolvedMigration> migrations, String prefix,
                               String separator, String[] suffixes, boolean repeatable ) {
        for (LoadableResource resource : resourceProvider.getResources(prefix, suffixes)) {
            String filename = resource.getFilename();
            if (isSqlCallback(filename, separator, suffixes)) {
                continue;
            }
            //解析版本信息
            Pair<MigrationVersion, String> info =
                    MigrationInfoHelper.extractVersionAndDescription(filename, prefix, separator, suffixes, repeatable);
            ResolvedMigrationImpl migration = new ResolvedMigrationImpl();
            migration.setVersion(info.getLeft());
            migration.setDescription(info.getRight());
            migration.setScript(resource.getRelativePath());
            //解析脚本
            SqlScript sqlScript = new SqlScript(sqlStatementBuilderFactory, resource, configuration.isMixed());
            //生成文件校验和
            int checksum = resource.checksum();
            migration.setChecksum(checksum);
            migration.setType(MigrationType.SQL);
            migration.setPhysicalLocation(resource.getAbsolutePathOnDisk());
            migration.setExecutor(new SqlMigrationExecutor(database, sqlScript));
            migrations.add(migration);
        }
    }


先看new SqlScript,会将文件解析成sql脚本


    public SqlScript(SqlStatementBuilderFactory sqlStatementBuilderFactory, LoadableResource resource, boolean mixed) {
        this.resource = resource;
        this.sqlStatementBuilderFactory = sqlStatementBuilderFactory;
        this.mixed = mixed;

        LOG.debug("Parsing " + resource.getFilename() + " ...");
        LineReader reader = null;
        try {
            reader = resource.loadAsString();
            this.sqlStatements = extractStatements(reader);
        } finally {
            IOUtils.close(reader);
        }
    }


最终根据不同数据库的分隔符解析每个文件,按行读取sql。


flyway从入门到放弃,你学废了吗


如何判断脚本文件是否重复,这里checksum就起到作用了,该方法经常用来判断文件是否做了修改


 @Override
    public final int checksum() {
        if (checksum == null) {
            final CRC32 crc32 = new CRC32();

            LineReader lineReader = null;
            try {
                lineReader = loadAsString();
                Line line;
                while ((line = lineReader.readLine()) != null) {
                    //noinspection Since15
                    crc32.update(StringUtils.trimLineBreak(line.getLine()).getBytes("UTF-8"));
                }

            } catch (IOException e) {
                throw new FlywayException("Unable to calculate checksum for " + getAbsolutePath()
                        + " (" + getAbsolutePathOnDisk() + "): " + e.getMessage(), e);
            } finally {
                IOUtils.close(lineReader);
            }

            checksum = (int) crc32.getValue();
        }
        return checksum;
    }


原理就是读取文件的每行内容,生成一个数值,并不断的更新,最后得到一个唯一的数字。该数字可能是正数也可能是负数。

flyway从入门到放弃,你学废了吗


最终返回的迁移脚本,包含了整个类路径下的脚本。

flyway从入门到放弃,你学废了吗


回到refresh方法那里,解析了类路径下的脚本,接下来就是从数据库中查询已迁移的历史脚本好做对比。


schemaHistory.allAppliedMigrations()


@Override
    public List<AppliedMigration> allAppliedMigrations() {
        if (!exists()) {
            return new ArrayList<>();
        }

        refreshCache();
        return cache;
    }
    
    
    private void refreshCache() {
        int maxCachedInstalledRank = cache.isEmpty() ? -1 : cache.getLast().getInstalledRank();

        String query = database.getSelectStatement(table, maxCachedInstalledRank);

        try {
            cache.addAll(jdbcTemplate.query(query, new RowMapper<AppliedMigration>() {
                public AppliedMigration mapRow(final ResultSet rs) throws SQLException {
                    Integer checksum = rs.getInt("checksum");
                    if (rs.wasNull()) {
                        checksum = null;
                    }

                    return new AppliedMigration(
                            rs.getInt("installed_rank"),
                            rs.getString("version") != null ? MigrationVersion.fromVersion(rs.getString("version")) : null,
                            rs.getString("description"),
                            MigrationType.valueOf(rs.getString("type")),
                            rs.getString("script"),
                            checksum,
                            rs.getTimestamp("installed_on"),
                            rs.getString("installed_by"),
                            rs.getInt("execution_time"),
                            rs.getBoolean("success")
                    );
                }
            }));
        } catch (SQLException e) {
            throw new FlywaySqlException("Error while retrieving the list of applied migrations from Schema History table "
                    + table, e);
        }
}

这里就是使用jdbc连接数据库查询了。并将结果缓存起来。


最终通过对比校验,将数据汇总到一起。


flyway从入门到放弃,你学废了吗

其中resolvedMigration代表classpath下校验通过的文件

appliedMigration是对应DB中已经迁移的数据。

很明显可以看出,当对应appliedMigration是null,说明是要迁移的数据。

接下来就是核心验证流程 migrationInfoService.validate()


再次描述几个参数的含义。

future,含义是db中的版本高于了类路径下的版本

missing,含义是db中有但是类路径中没有的

ignored,含义是db中没有,但是类路径中有,而且版本非最高的。

pengding,含义是db中没有,类路径中有且版本高于db中的。
public String validate() {
    MigrationState state = getState();

    // 超过了target,直接通过
    if (MigrationState.ABOVE_TARGET.equals(state)) {
        return null;
    }

    // failed状态只是来源于db中的记录。
    // 如果当前迁移在db中标记失败,且(不允许未来的迁移或状态不是FUTURE_FAILED),那么将抛出异常。
    // 也就是说,如果DB中最新的一条迁移记录是失败的,那么只要是执行了验证逻辑,总是要报错的。
    if (state.isFailed() && (!context.future || MigrationState.FUTURE_FAILED != state)) {
        if (getVersion() == null) {
            return "Detected failed repeatable migration: " + getDescription();
        }
        return "Detected failed migration to version " + getVersion() + " (" + getDescription() + ")";
    }

    // 类路径中信息没有时,对于V类型迁移来说,miss和future两种情况,依配置而定是否抛错。不允许miss和future会直接抛错。
    if ((resolvedMigration == null)
        && (appliedMigration.getType() != MigrationType.SCHEMA)
        && (appliedMigration.getType() != MigrationType.BASELINE)
        && (appliedMigration.getVersion() != null)
        && (!context.missing || (MigrationState.MISSING_SUCCESS != state && MigrationState.MISSING_FAILED != state))
        && (!context.future || (MigrationState.FUTURE_SUCCESS != state && MigrationState.FUTURE_FAILED != state))) {
        return "Detected applied migration not resolved locally: " + getVersion();
    }

   
    // 这个校验可能就分纯校验和校验后立即迁移的情况了。
    // pending本身是需要迁移的脚本状态,如果校验时配置了不允许pending,那么这里一旦出现pending,就会抛错。
    // ignored的逻辑也一样。
    if (!context.pending && MigrationState.PENDING == state || (!context.ignored && MigrationState.IGNORED == state)) {
        if (getVersion() != null) {
            return "Detected resolved migration not applied to database: " + getVersion();
        }
        return "Detected resolved repeatable migration not applied to database: " + getDescription();
    }

    // 如果不允许pending(实际上这种配置,目的是要求不要有任何新的迁移),但是存在需要迁移的R类型脚本,仍提示错误。
    if (!context.pending && MigrationState.OUTDATED == state) {
        return "Detected outdated resolved repeatable migration that should be re-applied to database: " + getDescription();
    }

    // db、类路径都存在
    if (resolvedMigration != null && appliedMigration != null) {
        // V与R的标识不同。
        String migrationIdentifier = appliedMigration.getVersion() == null ?
            // Repeatable migrations
            appliedMigration.getScript() :
        // Versioned migrations
        "version " + appliedMigration.getVersion();
        
        // 如果R类型脚本,或者版本高于baseline
        // 需要注意 migrationInfo的getVersion()方法是先去拿了db中的版本,没有的话,再去拿类路径下的版本。依然可以说明为null时,就是R类型
        if (getVersion() == null || getVersion().compareTo(context.baseline) > 0) {
            
            // 纯粹的脚本类型判断。db与类路径需要保持一致
            if (resolvedMigration.getType() != appliedMigration.getType()) {
                return createMismatchMessage("type", migrationIdentifier,
                                             appliedMigration.getType(), resolvedMigration.getType());
            }


            // V类型,或者(允许pending时R类型不需要pending且不是已经被应用过的状态)时,检查校验和是否一样。
            // 关于SUPERSEDED的含义,需要结合getState和context.latestRepeatableRuns维护逻辑去判断。
            if (resolvedMigration.getVersion() != null
                || (context.pending && MigrationState.OUTDATED != state && MigrationState.SUPERSEDED != state)) {
                // 检查校验和
                // 校验和的实现org.flywaydb.core.internal.resource.AbstractLoadableResource,一次读一行,忽略换行符。尾行的换行符不会影响校验和。但可能影响git
                if (!ObjectUtils.nullSafeEquals(resolvedMigration.getChecksum(), appliedMigration.getChecksum())) {
                    return createMismatchMessage("checksum", migrationIdentifier,
                                                 appliedMigration.getChecksum(), resolvedMigration.getChecksum());
                }
            }

            // 校验描述
            if (!AbbreviationUtils.abbreviateDescription(resolvedMigration.getDescription())
                .equals(appliedMigration.getDescription())) {
                return createMismatchMessage("description", migrationIdentifier,
                                             appliedMigration.getDescription(), resolvedMigration.getDescription());
            }
        }
    }
    return null;
}




flyway从入门到放弃,你学废了吗


2.2 .2 迁移

  public int migrate() throws FlywayException {
        callbackExecutor.onMigrateOrUndoEvent(Event.BEFORE_MIGRATE);

        int count;
        try {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();

            schemaHistory.create();

            //如果开启组事务,就尝试走组事务。
            count = configuration.isGroup() ?
                    // When group is active, start the transaction boundary early to
                    // ensure that all changes to the schema history table are either committed or rolled back atomically.
                    // lock将会开启事务,并通过get_lock函数上一把关于history表的排他锁
                    // 但是mysql存在ddl隐式提交(不支持ddl事务),所以这里并不保证组内迁移失败时可以正常回滚。
                    schemaHistory.lock(new Callable<Integer>() {
                        @Override
                        public Integer call() {
                            return migrateAll();
                        }
                    }) :
                    // For all regular cases, proceed with the migration as usual.
                    // 内部为每个迁移脚本开启事务。
                    migrateAll();

            stopWatch.stop();

            logSummary(count, stopWatch.getTotalTimeMillis());
        } catch (FlywayException e) {
            callbackExecutor.onMigrateOrUndoEvent(Event.AFTER_MIGRATE_ERROR);
            throw e;
        }

        callbackExecutor.onMigrateOrUndoEvent(Event.AFTER_MIGRATE);
        return count;
    }


schemaHistory.create()如果迁移历史表不存在,就创建。底层就是调用jdbc创建表

根据configuration.isGroup()属性,将迁移打包到一个事务内进行迁移。由于mysql不支持ddl事务,一般不使用这种方式。

❷schemaHistory.lock上锁逻辑就是数据库的排他锁。如mysql

flyway从入门到放弃,你学废了吗


❸migrateAll() ,迁移,内部会为每个文件创建一个事务。


 private int migrateAll() {
        int total = 0;
        while (true) {
            final boolean firstRun = total == 0;
            int count = configuration.isGroup()
                    // With group active a lock on the schema history table has already been acquired.
                    ? migrateGroup(firstRun)
                    // Otherwise acquire the lock now. The lock will be released at the end of each migration.
                    : schemaHistory.lock(new Callable<Integer>() {
                @Override
                public Integer call() {
                    return migrateGroup(firstRun);
                }
            });
            total += count;
            if (count == 0) {
                // No further migrations available
                break;
            }
        }
        return total;
    }


最终都是调用migrateGroup


private int migrateAll() {
        int total = 0;
        while (true) {
            final boolean firstRun = total == 0;
            int count = configuration.isGroup()
                    // With group active a lock on the schema history table has already been acquired.
                    ? migrateGroup(firstRun)
                    // Otherwise acquire the lock now. The lock will be released at the end of each migration.
                    : schemaHistory.lock(new Callable<Integer>() {
                @Override
                public Integer call() {
                    return migrateGroup(firstRun);
                }
            });
            total += count;
            if (count == 0) {
                // No further migrations available
                break;
            }
        }
        return total;
    }


继续跟进


private Integer migrateGroup(boolean firstRun) {
    MigrationInfoServiceImpl infoService =
        new MigrationInfoServiceImpl(migrationResolver, schemaHistory, configuration,
                                     configuration.getTarget(), configuration.isOutOfOrder(),
                                     true, true, true, true);
    // 迁移前,调用了刷新方法,保证拿到了最新的迁移信息。

    infoService.refresh();

    // 拿到当前已迁移的最新脚本,
    MigrationInfo current = infoService.current();
    
    MigrationVersion currentSchemaVersion = current == null ? MigrationVersion.EMPTY : current.getVersion();
    if (firstRun) {
        LOG.info("Current version of schema " + schema + ": " + currentSchemaVersion);

        if (configuration.isOutOfOrder()) {
            LOG.warn("outOfOrder mode is active. Migration of schema " + schema + " may not be reproducible.");
        }
    }

    // 拿到future类别的,包含失败的和成功的
    MigrationInfo[] future = infoService.future();
    if (future.length > 0) {
        List<MigrationInfo> resolved = Arrays.asList(infoService.resolved());
        Collections.reverse(resolved);
        if (resolved.isEmpty()) {
            LOG.warn("Schema " + schema + " has version " + currentSchemaVersion
                     + ", but no migration could be resolved in the configured locations !");
        } else {
            for (MigrationInfo migrationInfo : resolved) {
                // Only consider versioned migrations
                if (migrationInfo.getVersion() != null) {
                    LOG.warn("Schema " + schema + " has a version (" + currentSchemaVersion
                             + ") that is newer than the latest available migration ("
                             + migrationInfo.getVersion() + ") !");
                    break;
                }
            }
        }
    }

    // 拿到标注为失败的
    MigrationInfo[] failed = infoService.failed();
    if (failed.length > 0) {
        // 如果只有一个,且还是future类型的,如果配置了忽略isIgnoreFutureMigrations,
        //那么仅仅打印一下,否则抛异常提示。

        if ((failed.length == 1)
            && (failed[0].getState() == MigrationState.FUTURE_FAILED)
            && configuration.isIgnoreFutureMigrations()) {
            LOG.warn("Schema " + schema + " contains a failed future migration to version " + failed[0].getVersion() + " !");
        } else {
            if (failed[0].getVersion() == null) {
                throw new FlywayException("Schema " + schema + " contains a failed repeatable migration (" + failed[0].getDescription() + ") !");
            }
            throw new FlywayException("Schema " + schema + " contains a failed migration to version " + failed[0].getVersion() + " !");
        }
    }

    
    LinkedHashMap<MigrationInfoImpl, Boolean> group = new LinkedHashMap<>();
    // 拿到PENDING状态的,注意,这里是个遍历。内部由configuration.isGroup()去控制是否遍历全部,还是单个就停止
    // 根据是否插队,使用group的value进行标记。
    // 如果V类型的版本低于current的版本(db中已应用的最新版本),说明是个插队的(新加的,还处于某个版本间的,非最新的)
    for (MigrationInfoImpl pendingMigration : infoService.pending()) {
        boolean isOutOfOrder = pendingMigration.getVersion() != null
            && pendingMigration.getVersion().compareTo(currentSchemaVersion) < 0;
        group.put(pendingMigration, isOutOfOrder);

        // 如果没有开启组,那么一次处理一个迁移
        if (!configuration.isGroup()) {
            // Only include one pending migration if group is disabled
            break;
        }
    }

    if (!group.isEmpty()) {
        // 真正的迁移入口
        applyMigrations(group);
    }
    return group.size();
}


直接看真正的迁移入口applyMigrations,最后会调用doMigrateGroup执行迁移逻辑。

private void doMigrateGroup(LinkedHashMap<MigrationInfoImpl, Boolean> group, StopWatch stopWatch) {
    Context context = new Context() {
        @Override
        public Configuration getConfiguration() {
            return configuration;
        }

        @Override
        public java.sql.Connection getConnection() {
            return connectionUserObjects.getJdbcConnection();
        }
    };

    // 迁移
    // 对于configuration.isGroup()==false 来说,实际上只有一个迁移在group内
    // entry的value是outOfOrder,值为true表明是个插队迁移
    for (Map.Entry<MigrationInfoImpl, Boolean> entry : group.entrySet()) {
        final MigrationInfoImpl migration = entry.getKey();
        boolean isOutOfOrder = entry.getValue();
        //迁移信息
        final String migrationText = toMigrationText(migration, isOutOfOrder);

        stopWatch.start();

        LOG.info("Migrating " + migrationText);

        connectionUserObjects.restoreOriginalState();
        connectionUserObjects.changeCurrentSchemaTo(schema);

        try {
            callbackExecutor.setMigrationInfo(migration);
            callbackExecutor.onEachMigrateOrUndoEvent(Event.BEFORE_EACH_MIGRATE);
            try {
                //迁移脚本
                migration.getResolvedMigration().getExecutor().execute(context);
            } catch (FlywayException e) {
                callbackExecutor.onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE_ERROR);
                throw new FlywayMigrateException(migration, isOutOfOrder, e);
            } catch (SQLException e) {
                callbackExecutor.onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE_ERROR);
                throw new FlywayMigrateException(migration, isOutOfOrder, e);
            }

            LOG.debug("Successfully completed migration of " + migrationText);
            callbackExecutor.onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE);
        } finally {
            callbackExecutor.setMigrationInfo(null);
        }

        stopWatch.stop();
        int executionTime = (int) stopWatch.getTotalTimeMillis();
        //更新schemaHistory表
        schemaHistory.addAppliedMigration(migration.getVersion(), migration.getDescription(), migration.getType(),
                                          migration.getScript(), migration.getResolvedMigration().getChecksum(), executionTime, true);

    }
}


❶toMigrationText,获取迁移信息,当校验成功后,我们往往会看到

 Migrating schema `test` to version 1.0.2 – insert

这样的日志,以前以为到这里就代表成功了,其实这里才开始。


flyway从入门到放弃,你学废了吗


真正的迁移脚本


 @Override
    public void execute(SqlScript sqlScript) {

        List<SqlStatement> sqlStatements = sqlScript.getSqlStatements();
        for (int i = 0; i < sqlStatements.size(); i++) {
            SqlStatement sqlStatement = sqlStatements.get(i);
            String sql = sqlStatement.getSql();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Executing "+ "SQL: " + sql);
            }
        executeStatement(jdbcTemplate, sqlScript, sqlStatement);

        }
    }

下面就是jdbc的脚本执行逻辑了。


更新schemaHistory表

 @Override
    protected void doAddAppliedMigration(int installedRank, MigrationVersion version, String description,
                                         MigrationType type, String script, Integer checksum,
                                         int executionTime, boolean success) {
        connection.restoreOriginalState();

        // Lock again for databases with no DDL transactions to prevent implicit commits from triggering deadlocks
        // in highly concurrent environments
        if (!database.supportsDdlTransactions()) {
            table.lock();
        }

        try {
            String versionStr = version == null ? null : version.toString();

            jdbcTemplate.update(database.getInsertStatement(table),
                    installedRank, versionStr, description, type.name(), script, checksum, installedBy,
                    executionTime, success);


            LOG.debug("Schema History table " + table + " successfully updated to reflect changes");
        } catch (SQLException e) {
            throw new FlywaySqlException("Unable to insert row for version '" + version + "' in Schema History table " + table, e);
        }
    }


到这里如果执行顺利,程序就正常启动了,如果执行出错,迁移脚本会回滚。

但是迁移历史数据会入库,只要你不修改脚本就一直会报错


flyway从入门到放弃,你学废了吗

flyway从入门到放弃,你学废了吗


3.flyway 最佳实践


1. SQL 的文件名


开发环境和生产环境的 migration SQL 不共用. 开发过程往往是多人协作开发, DB migration 也相对比较频繁, 所以 SQL 脚本会很多.

(1). 开发环境 SQL 文件建议采用时间戳作为版本号.

开发环境 SQL 文件建议采用时间戳作为版本号, 多人一起开发不会导致版本号争用, 同时再加上生产环境的版本号, 这样的话, 将来手工 merge 成生产环境 V1.2d migration 脚本也比较方便, SQL 文件示例:
V20180317.10.59__V1.2_Unique_User_Names.sql
V20180317.14.59__V1.2_Add_SomeTables.sql

2.多个系统公用 DB schema


很多时候多个系统公用一个 DB schema , 这时候使用 spring.flyway.table 为不同的系统设置不同的 metadata 表, 缺省为 flyway_schema_history


3.DDL脚本


mysql 不支持ddl事务,如果回滚可能回滚不彻底。尽量将ddl和dml操作分开,如果数据库比较大,需要评估ddl时长。


4.脚本group


如果脚本有关联,可以将配置configuration.isGroup()设置为true,开启组合事务。


4.问题


如果脚本执行成功,程序因为其他原因启动失败,是否应该回滚?怎么控制事务?





flyway从入门到放弃,你学废了吗
关注我的你,是最香哒!














原文始发于微信公众号(小李的源码图):flyway从入门到放弃,你学废了吗

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/145462.html

(0)
青莲明月的头像青莲明月

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!