|
@@ -65,7 +65,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
private static final String SHOW_DATA_TABLE = "show tables where Tables_in_%s like \"%s\"";
|
|
|
private static final String DROP_TABLE = "DROP TABLE %s";
|
|
|
private static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
|
|
|
- private static final String UPGRADE_SQL = "upgrade";
|
|
|
|
|
|
@Autowired
|
|
|
private ConnectorFactory connectorFactory;
|
|
@@ -333,38 +332,18 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
}
|
|
|
|
|
|
private void initUpgradeSql() {
|
|
|
- // show tables where Tables_in_dbsyncer like "dbsyncer_data%"
|
|
|
- String sql = String.format(SHOW_DATA_TABLE, database, PREFIX_TABLE.concat(StorageEnum.DATA.getType()).concat("%"));
|
|
|
- List<String> tables = null;
|
|
|
try {
|
|
|
- tables = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
|
|
|
- } catch (EmptyResultDataAccessException e) {
|
|
|
- // 没有可更新的表
|
|
|
- }
|
|
|
- if (CollectionUtils.isEmpty(tables)) {
|
|
|
- return;
|
|
|
- }
|
|
|
- final String queryColumnCount = "SELECT count(*) FROM information_schema.columns WHERE table_name = '%s' and column_name = 'DATA'";
|
|
|
- tables.forEach(table -> {
|
|
|
- try {
|
|
|
- String query = String.format(queryColumnCount, table);
|
|
|
- // 是否已升级
|
|
|
- int count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(query, Integer.class));
|
|
|
- if (count == 0) {
|
|
|
- String ddlSql = readSql(UPGRADE_SQL, false, table);
|
|
|
- Stream.of(StringUtil.split(ddlSql, ";")).forEach(ddl -> executeSql(ddl));
|
|
|
+ executeSql("drop table if exists `dbsyncer_binlog`;");
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e.getCause() instanceof SQLSyntaxErrorException) {
|
|
|
+ SQLSyntaxErrorException ex = (SQLSyntaxErrorException) e.getCause();
|
|
|
+ if (ex.getSQLState().equals("42S21")) {
|
|
|
+ // ignore
|
|
|
+ return;
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- if (e.getCause() instanceof SQLSyntaxErrorException) {
|
|
|
- SQLSyntaxErrorException ex = (SQLSyntaxErrorException) e.getCause();
|
|
|
- if (ex.getSQLState().equals("42S21")) {
|
|
|
- // ignore
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- logger.error(e.getMessage());
|
|
|
}
|
|
|
- });
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void initTable() throws InterruptedException {
|
|
@@ -377,17 +356,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
|
|
|
List<Field> logFields = builder.getFields();
|
|
|
|
|
|
- // 缓存任务
|
|
|
- builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.BINLOG_STATUS, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.BINLOG_DATA);
|
|
|
- List<Field> binlogFields = builder.getFields();
|
|
|
-
|
|
|
// 数据
|
|
|
builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.BINLOG_DATA);
|
|
|
List<Field> dataFields = builder.getFields();
|
|
|
|
|
|
tables.computeIfAbsent(StorageEnum.CONFIG.getType(), k -> new Executor(k, configFields, true, true));
|
|
|
tables.computeIfAbsent(StorageEnum.LOG.getType(), k -> new Executor(k, logFields, true, false));
|
|
|
- tables.computeIfAbsent(StorageEnum.BINLOG.getType(), k -> new Executor(k, binlogFields, true, false));
|
|
|
tables.computeIfAbsent(StorageEnum.DATA.getType(), k -> new Executor(k, dataFields, false, false));
|
|
|
// 创建表
|
|
|
tables.forEach((tableName, e) -> {
|