Переглянути джерело

支持统计同步表信息 https://gitee.com/ghi/dbsyncer/issues/I5O2SZ

Signed-off-by: AE86 <836391306@qq.com>
AE86 2 роки тому
батько
коміт
d0acd0d63d

+ 26 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -20,6 +20,16 @@ public class Result<T> {
      */
     private StringBuffer error = new StringBuffer();
 
+    /**
+     * 驱动表映射关系ID
+     */
+    private String tableGroupId;
+
+    /**
+     * 目标表名称
+     */
+    private String targetTableGroupName;
+
     private final Object LOCK = new Object();
 
     public Result() {
@@ -62,4 +72,20 @@ public class Result<T> {
             this.successData.addAll(successData);
         }
     }
+
+    public String getTableGroupId() {
+        return tableGroupId;
+    }
+
+    public void setTableGroupId(String tableGroupId) {
+        this.tableGroupId = tableGroupId;
+    }
+
+    public String getTargetTableGroupName() {
+        return targetTableGroupName;
+    }
+
+    public void setTargetTableGroupName(String targetTableGroupName) {
+        this.targetTableGroupName = targetTableGroupName;
+    }
 }

+ 7 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -287,7 +287,7 @@ public class ParserFactory implements Parser {
 
             // 5、写入目标源
             BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, event, picker.getTargetFields(), target, batchSize);
-            Result writer = writeBatch(context, batchWriter, executorService);
+            Result result = writeBatch(context, batchWriter, executorService);
 
             // 6、同步完成后通知插件做后置处理
             pluginFactory.postProcessAfter(group.getPlugin(), context);
@@ -295,7 +295,9 @@ public class ParserFactory implements Parser {
             // 7、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
             task.setCursor(getLastCursor(data, pk));
-            flush(task, writer);
+            result.setTableGroupId(tableGroup.getId());
+            result.setTargetTableGroupName(tTableName);
+            flush(task, result);
 
             // 8、判断尾页
             if (data.size() < pageSize) {
@@ -394,10 +396,10 @@ public class ParserFactory implements Parser {
      * 更新缓存
      *
      * @param task
-     * @param writer
+     * @param result
      */
-    private void flush(Task task, Result writer) {
-        flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT);
+    private void flush(Task task, Result result) {
+        flushStrategy.flushFullData(task.getId(), result, ConnectorConstant.OPERTION_INSERT);
 
         // 发布刷新事件给FullExtractor
         task.setEndTime(Instant.now().toEpochMilli());

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -41,12 +41,12 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
 
         if (flushDataConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
             final String error = StringUtil.substring(result.getError().toString(), 0, flushDataConfig.getMaxErrorLength());
-            flushService.write(metaId, event, false, result.getFailData(), error);
+            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
         // 是否写增量数据
         if (flushDataConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
-            flushService.write(metaId, event, true, result.getSuccessData(), "");
+            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }
 

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -20,9 +20,11 @@ public interface FlushService {
      * 记录数据
      *
      * @param metaId
+     * @param tableGroupId
+     * @param targetTableGroupName
      * @param event
      * @param success
      * @param data
      */
-    void write(String metaId, String event, boolean success, List<Map> data, String error);
+    void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error);
 }

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -59,12 +59,14 @@ public class FlushServiceImpl implements FlushService {
     }
 
     @Override
-    public void write(String metaId, String event, boolean success, List<Map> data, String error) {
+    public void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
         data.forEach(r -> {
             Map<String, Object> row = new HashMap();
             row.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             row.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
+            row.put(ConfigConstant.DATA_TABLE_GROUP_ID, tableGroupId);
+            row.put(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableGroupName);
             row.put(ConfigConstant.DATA_EVENT, event);
             row.put(ConfigConstant.DATA_ERROR, substring(error));
             try {

+ 2 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -105,6 +105,8 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
                 picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount()));
 
         // 6、持久化同步结果
+        result.setTableGroupId(tableGroup.getId());
+        result.setTargetTableGroupName(targetTableName);
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
 
         // 7、执行批量处理后的

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java

@@ -31,7 +31,7 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
         if (!CollectionUtils.isEmpty(result.getFailData())) {
             logger.error(result.getError().toString());
             LogType logType = LogType.TableGroupLog.FULL_FAILED;
-            logService.log(logType, "%s:%s", logType.getMessage(), result.getError().toString());
+            logService.log(logType, "%s:%s:%s", result.getTargetTableGroupName(), logType.getMessage(), result.getError().toString());
         }
     }
 

+ 3 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -31,7 +31,9 @@ public class ConfigConstant {
      * 数据
      */
     public static final String DATA_SUCCESS = "success";
+    public static final String DATA_TABLE_GROUP_ID = "tableGroupId";
+    public static final String DATA_TARGET_TABLE_NAME = "targetTableName";
     public static final String DATA_EVENT = "event";
     public static final String DATA_ERROR = "error";
 
-}
+}

+ 120 - 77
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -1,14 +1,11 @@
 package org.dbsyncer.storage.support;
 
 import org.dbsyncer.common.model.Paging;
-import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
-import org.dbsyncer.connector.config.WriterBatchConfig;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -22,6 +19,7 @@ import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Param;
 import org.dbsyncer.storage.query.Query;
+import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -32,10 +30,14 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.SQLSyntaxErrorException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -60,10 +62,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     private static final String PREFIX_TABLE = "dbsyncer_";
     private static final String SHOW_TABLE = "show tables where Tables_in_%s = \"%s\"";
+    private static final String SHOW_DATA_TABLE = "show tables where Tables_in_%s like \"%s\"";
     private static final String DROP_TABLE = "DROP TABLE %s";
     private static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
-    private static final String TABLE_CREATE_TIME = "create_time";
-    private static final String TABLE_UPDATE_TIME = "update_time";
+    private static final String UPGRADE_SQL = "upgrade";
 
     @Autowired
     private ConnectorFactory connectorFactory;
@@ -86,6 +88,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         connector = (Database) connectorFactory.getConnector(connectorMapper);
         database = DatabaseUtil.getDatabaseName(config.getUrl());
 
+        // 升级脚本
+        initUpgradeSql();
+
         // 初始化表
         initTable();
     }
@@ -121,7 +126,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         Executor executor = getExecutor(type, table);
         String sql = executor.getUpdate();
         List<Object> args = getParams(executor, params);
-        args.add(params.get(ConfigConstant.CONFIG_MODEL_ID));
         int update = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, args.toArray()));
         Assert.isTrue(update > 0, "update failed");
     }
@@ -159,12 +163,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void insertData(StorageEnum type, String table, List<Map> list) {
         if (!CollectionUtils.isEmpty(list)) {
             Executor executor = getExecutor(type, table);
-            Map<String, String> command = new HashMap<>();
-            command.put(SqlBuilderEnum.INSERT.getName(), executor.getInsert());
-            ConnectorMapper connectorMapper = connectorFactory.connect(config);
-            connectorFactory.writer(connectorMapper, new WriterBatchConfig(table, ConnectorConstant.OPERTION_INSERT, command, executor.getFields(), list));
+            List<Object[]> args = list.stream().map(row -> {
+                List<Object> params = getParams(executor, row);
+                return params.toArray();
+            }).collect(Collectors.toList());
+            connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(executor.getInsert(), args));
         }
-
     }
 
     @Override
@@ -190,7 +194,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     private List<Object> getParams(Executor executor, Map params) {
-        return executor.getFieldPairs().stream().map(p -> params.get(p.labelName)).collect(Collectors.toList());
+        return executor.getFields().stream().map(f -> params.get(f.getLabelName())).collect(Collectors.toList());
     }
 
     private Executor getExecutor(StorageEnum type, String table) {
@@ -207,7 +211,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
                 return tables.get(table);
             }
             // 不存在
-            Executor newExecutor = new Executor(executor.getGroup(), executor.getFieldPairs(), executor.getFields(), executor.isDynamicTableName(), executor.isSystemType(), executor.isOrderByUpdateTime());
+            Executor newExecutor = new Executor(executor.getGroup(), executor.getFields(), executor.isDynamicTableName(), executor.isSystemType(), executor.isOrderByUpdateTime());
             createTableIfNotExist(table, newExecutor);
 
             tables.putIfAbsent(table, newExecutor);
@@ -253,26 +257,57 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
     }
 
+    private void initUpgradeSql() {
+        // show tables where Tables_in_dbsyncer like "dbsyncer_data%"
+        String sql = String.format(SHOW_DATA_TABLE, database, PREFIX_TABLE.concat(StorageEnum.DATA.getType()).concat("%"));
+        Map<String, String> tables = null;
+        try {
+            tables = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForMap(sql));
+        } catch (EmptyResultDataAccessException e) {
+            // 没有可更新的表
+        }
+        if (CollectionUtils.isEmpty(tables)) {
+            return;
+        }
+        tables.values().forEach(table -> {
+            String ddl = readSql(UPGRADE_SQL, true, table);
+            try {
+                executeSql(ddl);
+                logger.info(ddl);
+            } catch (Exception e) {
+                if (e.getCause() instanceof SQLSyntaxErrorException) {
+                    SQLSyntaxErrorException ex = (SQLSyntaxErrorException) e.getCause();
+                    if (ex.getSQLState().equals("42S21")) {
+                        // ignore
+                        return;
+                    }
+                }
+                logger.error(e.getMessage());
+            }
+        });
+    }
+
     private void initTable() throws InterruptedException {
         // 配置
         FieldBuilder builder = new FieldBuilder();
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_NAME, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
-        List<FieldPair> configFields = builder.getFieldPairs();
-        List<Field> cfields = builder.getFields();
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_NAME, ConfigConstant.CONFIG_MODEL_TYPE,
+                ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
+        List<Field> configFields = builder.getFields();
 
         // 日志
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
-        List<FieldPair> logFields = builder.getFieldPairs();
-        List<Field> lfields = builder.getFields();
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME,
+                ConfigConstant.CONFIG_MODEL_JSON);
+        List<Field> logFields = builder.getFields();
 
         // 数据
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
-        List<FieldPair> dataFields = builder.getFieldPairs();
-        List<Field> dfields = builder.getFields();
-
-        tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields, cfields, false, true, true));
-        tables.putIfAbsent(StorageEnum.LOG.getType(), new Executor(StorageEnum.LOG, logFields, lfields, false, true, false));
-        tables.putIfAbsent(StorageEnum.DATA.getType(), new Executor(StorageEnum.DATA, dataFields, dfields, true, false, false));
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID,
+                ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR,
+                ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
+        List<Field> dataFields = builder.getFields();
+
+        tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields, false, true, true));
+        tables.putIfAbsent(StorageEnum.LOG.getType(), new Executor(StorageEnum.LOG, logFields, false, true, false));
+        tables.putIfAbsent(StorageEnum.DATA.getType(), new Executor(StorageEnum.DATA, dataFields, true, false, false));
         // 创建表
         tables.forEach((tableName, e) -> {
             if (!e.isDynamicTableName()) {
@@ -292,16 +327,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForMap(sql));
         } catch (EmptyResultDataAccessException e) {
             // 不存在表
-            String type = executor.getGroup().getType();
-            String template = PREFIX_TABLE.concat(type);
-            String ddl = readSql("/".concat(template).concat(".sql"));
-            // 动态替换表名
-            ddl = executor.isDynamicTableName() ? StringUtil.replaceOnce(ddl, template, table) : ddl;
+            String ddl = readSql(executor.getGroup().getType(), executor.isDynamicTableName(), table);
             logger.info(ddl);
             executeSql(ddl);
         }
 
-        List<Field> fields = executor.getFieldPairs().stream().map(p -> new Field(p.columnName, p.labelName)).collect(Collectors.toList());
+        List<Field> fields = executor.getFields();
         final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, ConfigConstant.CONFIG_MODEL_ID, fields, "", "");
 
         String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildQuerySql(config);
@@ -311,7 +342,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         executor.setTable(table).setQuery(query).setInsert(insert).setUpdate(update).setDelete(delete);
     }
 
-    private String readSql(String filePath) {
+    private String readSql(String type, boolean dynamicTableName, String table) {
+        String template = PREFIX_TABLE.concat(type);
+        String filePath = "/".concat(template).concat(".sql");
+
         StringBuilder res = new StringBuilder();
         InputStream in = null;
         InputStreamReader isr = null;
@@ -331,6 +365,11 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             close(isr);
             close(in);
         }
+
+        // 动态替换表名
+        if (dynamicTableName) {
+            return StringUtil.replaceOnce(res.toString(), template, table);
+        }
         return res.toString();
     }
 
@@ -369,51 +408,59 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         this.config = config;
     }
 
-    final class FieldPair {
+    final class FieldWrapper {
+        Field field;
         String columnName;
         String labelName;
 
-        public FieldPair(String columnName) {
-            this.columnName = columnName;
-            this.labelName = columnName;
+        public FieldWrapper(Field field) {
+            this(field, field.getName());
         }
 
-        public FieldPair(String columnName, String labelName) {
-            this.columnName = columnName;
+        public FieldWrapper(Field field, String labelName) {
+            this.field = field;
+            this.columnName = field.getName();
             this.labelName = labelName;
         }
+
+        public Field getField() {
+            return field;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public String getLabelName() {
+            return labelName;
+        }
     }
 
     final class FieldBuilder {
-        Map<String, FieldPair> fieldPairMap = new ConcurrentHashMap<>();
-        Map<String, Field> fieldMap = new ConcurrentHashMap<>();
-        List<FieldPair> fieldPairs;
+        Map<String, Field> fieldMap;
         List<Field> fields;
 
         public FieldBuilder() {
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_ID, new FieldPair(ConfigConstant.CONFIG_MODEL_ID));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_NAME, new FieldPair(ConfigConstant.CONFIG_MODEL_NAME));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_TYPE, new FieldPair(ConfigConstant.CONFIG_MODEL_TYPE));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_CREATE_TIME, new FieldPair(TABLE_CREATE_TIME, ConfigConstant.CONFIG_MODEL_CREATE_TIME));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, new FieldPair(TABLE_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_JSON, new FieldPair(ConfigConstant.CONFIG_MODEL_JSON));
-            fieldPairMap.putIfAbsent(ConfigConstant.DATA_SUCCESS, new FieldPair(ConfigConstant.DATA_SUCCESS));
-            fieldPairMap.putIfAbsent(ConfigConstant.DATA_EVENT, new FieldPair(ConfigConstant.DATA_EVENT));
-            fieldPairMap.putIfAbsent(ConfigConstant.DATA_ERROR, new FieldPair(ConfigConstant.DATA_ERROR));
-
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_ID, new Field(ConfigConstant.CONFIG_MODEL_ID, "VARCHAR", Types.VARCHAR, true));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_NAME, new Field(ConfigConstant.CONFIG_MODEL_NAME, "VARCHAR", Types.VARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_TYPE, new Field(ConfigConstant.CONFIG_MODEL_TYPE, "VARCHAR", Types.VARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_CREATE_TIME, new Field(ConfigConstant.CONFIG_MODEL_CREATE_TIME, "BIGINT", Types.BIGINT));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, new Field(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, "BIGINT", Types.BIGINT));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_JSON, new Field(ConfigConstant.CONFIG_MODEL_JSON, "LONGVARCHAR", Types.LONGVARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.DATA_SUCCESS, new Field(ConfigConstant.DATA_SUCCESS, "INTEGER", Types.INTEGER));
-            fieldMap.putIfAbsent(ConfigConstant.DATA_EVENT, new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.DATA_ERROR, new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR));
-        }
-
-        public List<FieldPair> getFieldPairs() {
-            return fieldPairs;
+            fieldMap = Stream.of(
+                    new Field(ConfigConstant.CONFIG_MODEL_ID, "VARCHAR", Types.VARCHAR, true),
+                    new Field(ConfigConstant.CONFIG_MODEL_NAME, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.CONFIG_MODEL_TYPE, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.CONFIG_MODEL_CREATE_TIME, "BIGINT", Types.BIGINT),
+                    new Field(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, "BIGINT", Types.BIGINT),
+                    new Field(ConfigConstant.CONFIG_MODEL_JSON, "LONGVARCHAR", Types.LONGVARCHAR),
+                    new Field(ConfigConstant.DATA_SUCCESS, "INTEGER", Types.INTEGER),
+                    new Field(ConfigConstant.DATA_TABLE_GROUP_ID, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.DATA_TARGET_TABLE_NAME, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR)
+            )
+            .map(field -> {
+                field.setLabelName(field.getName());
+                // 转换列下划线
+                String labelName = UnderlineToCamelUtils.camelToUnderline(field.getName());
+                field.setName(labelName);
+                return field;
+            }).collect(Collectors.toMap(Field::getLabelName, field -> field));
         }
 
         public List<Field> getFields() {
@@ -421,11 +468,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
 
         public void build(String... fieldNames) {
-            fieldPairs = new ArrayList<>(fieldNames.length);
             fields = new ArrayList<>(fieldNames.length);
             Stream.of(fieldNames).parallel().forEach(k -> {
-                fieldPairs.add(fieldPairMap.get(k));
-                fields.add(fieldMap.get(k));
+                if(fieldMap.containsKey(k)){
+                    Field field = fieldMap.get(k);
+                    fields.add(field);
+                }
             });
         }
     }
@@ -437,15 +485,14 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         private String update;
         private String delete;
         private StorageEnum group;
-        private List<FieldPair> fieldPairs;
         private List<Field> fields;
         private boolean dynamicTableName;
         private boolean systemType;
         private boolean orderByUpdateTime;
 
-        public Executor(StorageEnum group, List<FieldPair> fieldPairs, List<Field> fields, boolean dynamicTableName, boolean systemType, boolean orderByUpdateTime) {
+        public Executor(StorageEnum group, List<Field> fields, boolean dynamicTableName,
+                        boolean systemType, boolean orderByUpdateTime) {
             this.group = group;
-            this.fieldPairs = fieldPairs;
             this.fields = fields;
             this.dynamicTableName = dynamicTableName;
             this.systemType = systemType;
@@ -505,10 +552,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             return fields;
         }
 
-        public List<FieldPair> getFieldPairs() {
-            return fieldPairs;
-        }
-
         public boolean isDynamicTableName() {
             return dynamicTableName;
         }

+ 56 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/UnderlineToCamelUtils.java

@@ -0,0 +1,56 @@
+package org.dbsyncer.storage.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class UnderlineToCamelUtils {
+
+    /**
+     * 下划线转驼峰法
+     *
+     * @param line       源字符串
+     * @param smallCamel 大小驼峰,是否为小驼峰
+     * @return 转换后的字符串
+     */
+    public static String underlineToCamel(String line, boolean smallCamel) {
+        if (line == null || "".equals(line)) {
+            return "";
+        }
+        StringBuffer sb = new StringBuffer();
+        Pattern pattern = Pattern.compile("([A-Za-z\\d]+)(_)?");
+        Matcher matcher = pattern.matcher(line);
+        while (matcher.find()) {
+            String word = matcher.group();
+            sb.append(smallCamel && matcher.start() == 0 ? Character.toLowerCase(word.charAt(0)) : Character.toUpperCase(word.charAt(0)));
+            int index = word.lastIndexOf('_');
+            if (index > 0) {
+                sb.append(word.substring(1, index).toLowerCase());
+            } else {
+                sb.append(word.substring(1).toLowerCase());
+            }
+        }
+        return sb.toString();
+    }
+
+    /**
+     * 驼峰法转下划线
+     *
+     * @param line 源字符串
+     * @return 转换后的字符串
+     */
+    public static String camelToUnderline(String line) {
+        if (line == null || "".equals(line)) {
+            return "";
+        }
+        line = String.valueOf(line.charAt(0)).toUpperCase().concat(line.substring(1));
+        StringBuffer sb = new StringBuffer();
+        Pattern pattern = Pattern.compile("[A-Z]([a-z\\d]+)?");
+        Matcher matcher = pattern.matcher(line);
+        while (matcher.find()) {
+            String word = matcher.group();
+            sb.append(word.toUpperCase());
+            sb.append(matcher.end() == line.length() ? "" : "_");
+        }
+        return sb.toString();
+    }
+}

+ 2 - 0
dbsyncer-storage/src/main/resources/dbsyncer_data.sql

@@ -1,6 +1,8 @@
 CREATE TABLE `dbsyncer_data` (
   `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
   `SUCCESS` int(1) NOT NULL COMMENT '成功1/失败0',
+  `TABLE_GROUP_ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '驱动表映射关系ID',
+  `TARGET_TABLE_NAME` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '目标表名称',
   `EVENT` varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '事件',
   `ERROR` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL COMMENT '异常信息',
   `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',

+ 1 - 0
dbsyncer-storage/src/main/resources/dbsyncer_upgrade.sql

@@ -0,0 +1 @@
+ALTER TABLE dbsyncer_upgrade ADD COLUMN TABLE_GROUP_ID VARCHAR(64) DEFAULT '' COMMENT '驱动表映射关系ID' AFTER SUCCESS, ADD COLUMN TARGET_TABLE_NAME VARCHAR(100) DEFAULT '' COMMENT '目标表名称' AFTER TABLE_GROUP_ID;