فهرست منبع

!347 支持查看ddl数据
Merge pull request !347 from AE86/dev

AE86 2 ماه پیش
والد
کامیت
9bcd8e8e90
23فایلهای تغییر یافته به همراه318 افزوده شده و 342 حذف شده
  1. 13 5
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  2. 1 1
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java
  3. 1 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java
  4. 4 2
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java
  5. 2 2
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMinerHelper.java
  6. 2 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/TableGroupContext.java
  7. 4 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/AlterStrategy.java
  8. 5 23
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/DDLParser.java
  9. 17 76
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/AddStrategy.java
  10. 17 15
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ChangeStrategy.java
  11. 12 50
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/DropStrategy.java
  12. 16 20
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ModifyStrategy.java
  13. 119 68
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/impl/DDLParserImpl.java
  14. 2 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  15. 5 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java
  16. 35 35
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  17. 17 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/TableGroupContextImpl.java
  18. 27 30
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/DDLConfig.java
  19. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/ListenerConfig.java
  20. 2 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java
  21. 14 2
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/DDLOperationEnum.java
  22. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractQuartzListener.java
  23. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/storage/AbstractStorageService.java

+ 13 - 5
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -19,14 +19,15 @@ import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.sdk.constant.ConfigConstant;
+import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.enums.StorageEnum;
-import org.dbsyncer.sdk.listener.event.RowChangedEvent;
-import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.filter.FieldResolver;
 import org.dbsyncer.sdk.filter.Query;
+import org.dbsyncer.sdk.listener.event.RowChangedEvent;
+import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.storage.StorageService;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
-import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.storage.util.BinlogMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,11 +112,18 @@ public class DataSyncServiceImpl implements DataSyncService {
             return Collections.EMPTY_MAP;
         }
 
-        // 3、反序列
+        // 3、获取DDL
         Map<String, Object> target = new HashMap<>();
+        BinlogMap message = BinlogMap.parseFrom(bytes);
+        String event = (String) row.get(ConfigConstant.DATA_EVENT);
+        if (StringUtil.equals(event, ConnectorConstant.OPERTION_ALTER)) {
+            message.getRowMap().forEach((k, v) -> target.put(k, v.toStringUtf8()));
+            return target;
+        }
+
+        // 4、反序列
         final Picker picker = new Picker(tableGroup);
         final Map<String, Field> fieldMap = picker.getTargetFieldMap();
-        BinlogMap message = BinlogMap.parseFrom(bytes);
         message.getRowMap().forEach((k, v) -> {
             if (fieldMap.containsKey(k)) {
                 try {

+ 1 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -18,7 +18,7 @@ public class Result<T> {
     /**
      * 错误日志
      */
-    private StringBuffer error = new StringBuffer();
+    private final StringBuffer error = new StringBuffer();
 
     /**
      * 驱动表映射关系ID

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java

@@ -347,8 +347,8 @@ public class MySQLListener extends AbstractDatabaseListener {
             if (StringUtil.isBlank(databaseName)) {
                 databaseName = data.getDatabase();
             }
+            databaseName = StringUtil.replace(databaseName, "`", "");
             if (isFilterTable(databaseName, tableName)) {
-                logger.info("sql:{}", data.getSql());
                 trySendEvent(new DDLChangedEvent(tableName, ConnectorConstant.OPERTION_ALTER,
                         data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
             }

+ 4 - 2
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -61,7 +61,9 @@ public class LogMiner {
     }
 
     private void closeQuietly() {
-        LogMinerHelper.endLogMiner(connection);
+        if (isValid()) {
+            LogMinerHelper.endLogMiner(connection);
+        }
         if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
@@ -140,7 +142,7 @@ public class LogMiner {
                 logger.info("Reconnect successfully");
                 break;
             } catch (Exception e) {
-                logger.error(url, e);
+                logger.error("Reconnect failed", e);
                 sleepSeconds(5);
             }
         }

+ 2 - 2
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMinerHelper.java

@@ -101,9 +101,9 @@ public class LogMinerHelper {
         if (connection != null) {
             try {
                 executeCallableStatement(connection, LOG_MINER_SQL_END_LOG_MINER);
-            } catch (SQLException e) {
+            } catch (Exception e) {
                 if (e.getMessage().toUpperCase().contains("ORA-01307")) {
-                    logger.info("LogMiner session was already closed");
+                    logger.info("LogMiner session was already closed", e);
                 } else {
                     logger.warn("Cannot close log miner session gracefully", e);
                 }

+ 2 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/TableGroupContext.java

@@ -18,6 +18,8 @@ public interface TableGroupContext {
 
     void put(Mapping mapping, List<TableGroup> tableGroups);
 
+    void update(Mapping mapping, List<TableGroup> tableGroups);
+
     List<TableGroupPicker> getTableGroupPickers(String metaId, String tableName);
 
     void clear(String metaId);

+ 4 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/AlterStrategy.java

@@ -1,11 +1,11 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
 package org.dbsyncer.parser.ddl;
 
 import net.sf.jsqlparser.statement.alter.AlterExpression;
-import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.sdk.config.DDLConfig;
 
-import java.util.List;
-
 /**
  * Alter策略
  *
@@ -20,7 +20,6 @@ public interface AlterStrategy {
      *
      * @param expression
      * @param ddlConfig
-     * @param fieldMappingList
      */
-    void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> fieldMappingList);
+    void parse(AlterExpression expression, DDLConfig ddlConfig);
 }

+ 5 - 23
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/DDLParser.java

@@ -3,32 +3,14 @@
  */
 package org.dbsyncer.parser.ddl;
 
-import org.dbsyncer.parser.model.FieldMapping;
+import net.sf.jsqlparser.JSQLParserException;
+import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.sdk.config.DDLConfig;
-import org.dbsyncer.sdk.model.MetaInfo;
-
-import java.util.List;
+import org.dbsyncer.sdk.spi.ConnectorService;
 
 public interface DDLParser {
 
-    /**
-     * 解析DDL配置
-     *
-     * @param sql                   源表ALTER语句
-     * @param targetConnectorType   目标连接器类型
-     * @param targetTableName       目标表
-     * @param originalFieldMappings 字段映射关系
-     * @return
-     */
-    DDLConfig parseDDlConfig(String sql, String targetConnectorType, String targetTableName, List<FieldMapping> originalFieldMappings);
+    DDLConfig parse(ConnectorService connectorService, TableGroup tableGroup, String sql) throws JSQLParserException;
 
-    /**
-     * 刷新字段映射关系(根据原来的映射关系和更改的字段进行新关系的映射组合)
-     *
-     * @param originalFieldMappings
-     * @param originMetaInfo
-     * @param targetMetaInfo
-     * @param targetDDLConfig
-     */
-    List<FieldMapping> refreshFiledMappings(List<FieldMapping> originalFieldMappings, MetaInfo originMetaInfo, MetaInfo targetMetaInfo, DDLConfig targetDDLConfig);
+    void refreshFiledMappings(TableGroup tableGroup, DDLConfig targetDDLConfig);
 }

+ 17 - 76
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/AddStrategy.java

@@ -1,95 +1,36 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
 package org.dbsyncer.parser.ddl.alter;
 
 import net.sf.jsqlparser.statement.alter.AlterExpression;
-import net.sf.jsqlparser.statement.create.table.Index;
-import net.sf.jsqlparser.statement.create.table.Index.ColumnParams;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.ddl.AlterStrategy;
-import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.sdk.config.DDLConfig;
 import org.dbsyncer.sdk.enums.DDLOperationEnum;
-import org.dbsyncer.sdk.model.Field;
-
-import java.util.LinkedList;
-import java.util.List;
 
 /**
- * 解析add的属性 exampleSql: ALTER TABLE cost ADD duan INT after(before) `tmp`;
+ * 新增字段
+ * <code>
+ *     ALTER TABLE `test`.`test_user`
+ * ADD COLUMN `aaa` varchar(255) NULL AFTER `create_date`,
+ * ADD COLUMN `bbb` varchar(255) NULL AFTER `aaa`
+ * </code>
  *
  * @author life
  */
 public class AddStrategy implements AlterStrategy {
 
     @Override
-    public void parse(AlterExpression expression, DDLConfig ddlConfig,
-            List<FieldMapping> originFiledMapping) {
+    public void parse(AlterExpression expression, DDLConfig ddlConfig) {
         if (expression.getColDataTypeList() != null) {
-            parseAddColumn(expression, ddlConfig, originFiledMapping);
-        }
-        if (expression.getIndex() != null) {
-            parseAddIndex(expression, originFiledMapping);
-        }
-        ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_ADD);
-    }
-
-    //解析增加列
-    //exampleSql: ALTER TABLE cost ADD duan INT after(before) `tmp`;
-    private void parseAddColumn(AlterExpression expression, DDLConfig ddlConfig,
-            List<FieldMapping> originFiledMapping) {
-        //如果是增加列
-        for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
-            boolean findColumn = false;
-            List<String> columnSpecs = new LinkedList<>();
-            for (String spe : columnDataType.getColumnSpecs()) {//对一before,after进行处理
-                spe = StringUtil.replace(spe, StringUtil.BACK_QUOTE, StringUtil.EMPTY);
-                spe = StringUtil.replace(spe, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
-                if (findColumn) {
-                    //对before(after)字段进行映射
-                    String finalSpe = spe;
-                    FieldMapping fieldMapping = originFiledMapping.stream()
-                            .filter(x -> StringUtil.equals(x.getSource().getName(), finalSpe))
-                            .findFirst().get();
-                    columnSpecs.add(fieldMapping.getTarget().getName());
-                    findColumn = false;
-                    continue;
-                }
-
-                if (StringUtil.equalsIgnoreCase(spe, "before") || StringUtil.equalsIgnoreCase(spe,
-                        "after")) {
-                    findColumn = true;
-                }
-                columnSpecs.add(spe);
+            for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+                String columName = columnDataType.getColumnName();
+                columName = StringUtil.replace(columName, StringUtil.BACK_QUOTE, StringUtil.EMPTY);
+                columName = StringUtil.replace(columName, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
+                ddlConfig.getAddedFieldNames().add(columName);
             }
-            columnDataType.setColumnSpecs(columnSpecs);
-            String columName = columnDataType.getColumnName();
-            columName = StringUtil.replace(columName, StringUtil.BACK_QUOTE, StringUtil.EMPTY);
-            columName = StringUtil.replace(columName, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
-            Field field = new Field(columName, columnDataType.getColDataType().getDataType(),
-                    0);//感觉不需要都行,只需要名称,后续可以自己刷新
-            ddlConfig.getAddFields().add(field);
         }
-
-    }
-
-    /**
-     * 新增索引 exampleSql: ALTER TABLE test_table add index name (tmp);
-     *
-     * @param expression
-     * @param originFiledMapping
-     */
-    private void parseAddIndex(AlterExpression expression,
-            List<FieldMapping> originFiledMapping) {
-        Index index = expression.getIndex();
-        List<ColumnParams> columnNames = index.getColumns();
-        List<ColumnParams> targetNames = new LinkedList<>();
-        for (ColumnParams columnParams : columnNames) {
-            FieldMapping fieldMapping = originFiledMapping.stream()
-                    .filter(x -> StringUtil.equals(x.getSource().getName(),
-                            columnParams.getColumnName())).findFirst().get();
-            ColumnParams target = new ColumnParams(fieldMapping.getTarget().getName(),
-                    columnParams.getParams());
-            targetNames.add(target);
-        }
-        index.setColumns(targetNames);
+        ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_ADD);
     }
-}
+}

+ 17 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ChangeStrategy.java

@@ -1,36 +1,38 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
 package org.dbsyncer.parser.ddl.alter;
 
 import net.sf.jsqlparser.statement.alter.AlterExpression;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.ddl.AlterStrategy;
-import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.sdk.config.DDLConfig;
 import org.dbsyncer.sdk.enums.DDLOperationEnum;
 
-import java.util.List;
-
 /**
  * 解析change属性
- * exampleSql: ALTER TABLE test_table CHANGE duan1  duan2 INT(10)
+ * <code>
+ *     ALTER TABLE `test`.`test_user`
+ * CHANGE COLUMN `name` `name2` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL AFTER `id`,
+ * CHANGE COLUMN `remark` `remark2` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL AFTER `name2`
+ * </code>
  *
  * @author life
  */
 public class ChangeStrategy implements AlterStrategy {
 
     @Override
-    public void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings) {
-        String oldColumnName = StringUtil.replace(expression.getColumnOldName(), StringUtil.BACK_QUOTE, StringUtil.EMPTY);
-        oldColumnName = StringUtil.replace(oldColumnName, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
-        ddlConfig.setSourceColumnName(oldColumnName);
-        String finalOldColumnName = oldColumnName;
-        FieldMapping fieldMapping = originalFieldMappings.stream().filter(x -> StringUtil.equals(x.getSource().getName(),
-                finalOldColumnName)).findFirst().orElse(null);
-        if (fieldMapping != null) {
-            expression.setColumnOldName(fieldMapping.getTarget().getName());
+    public void parse(AlterExpression expression, DDLConfig ddlConfig) {
+        if (expression.getColDataTypeList() != null) {
             for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
-                ddlConfig.setChangedColumnName(columnDataType.getColumnName());
+                String oldColumnName = StringUtil.replace(expression.getColumnOldName(), StringUtil.BACK_QUOTE, StringUtil.EMPTY);
+                oldColumnName = StringUtil.replace(oldColumnName, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
+
+                String changedColumnName = StringUtil.replace(columnDataType.getColumnName(), StringUtil.BACK_QUOTE, StringUtil.EMPTY);
+                changedColumnName = StringUtil.replace(changedColumnName, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
+                ddlConfig.getChangedFieldNames().put(oldColumnName, changedColumnName);
             }
         }
         ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_CHANGE);
     }
-}
+}

+ 12 - 50
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/DropStrategy.java

@@ -1,71 +1,33 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
 package org.dbsyncer.parser.ddl.alter;
 
 import net.sf.jsqlparser.statement.alter.AlterExpression;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.ddl.AlterStrategy;
-import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.sdk.config.DDLConfig;
 import org.dbsyncer.sdk.enums.DDLOperationEnum;
-import org.dbsyncer.sdk.model.Field;
-
-import java.util.List;
 
 /**
  * 解析drop
+ * <code>
+ *     ALTER TABLE `test`.`test_user`
+ * DROP COLUMN `aaa`,
+ * DROP COLUMN `bbb`
+ * </code>
  *
  * @author life
  */
 public class DropStrategy implements AlterStrategy {
 
     @Override
-    public void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings) {
+    public void parse(AlterExpression expression, DDLConfig ddlConfig) {
         if (expression.getColumnName() != null) {
-            dropColumn(expression, ddlConfig, originalFieldMappings);
-        }
-        if (expression.getIndex() != null) {
-            dropIndex(expression, originalFieldMappings);
+            String columnName = StringUtil.replace(expression.getColumnName(), StringUtil.BACK_QUOTE, StringUtil.EMPTY);
+            columnName = StringUtil.replace(columnName, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
+            ddlConfig.getDroppedFieldNames().add(columnName);
         }
         ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_DROP);
     }
-
-    /**
-     * example: ALTER TABLE test_table DROP dis;
-     *
-     * @param expression
-     * @param ddlConfig
-     * @param originalFieldMappings
-     */
-    private void dropColumn(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings) {
-        String columnName = StringUtil.replace(expression.getColumnName(), StringUtil.BACK_QUOTE, StringUtil.EMPTY);
-        columnName = StringUtil.replace(columnName,StringUtil.DOUBLE_QUOTATION,StringUtil.EMPTY);
-        Field field = new Field(columnName, null, 0);
-        //需要把列替换成目标的列名
-        String finalColumnName = columnName;
-        originalFieldMappings.stream()
-                .filter(x -> StringUtil.equals(x.getSource().getName(), finalColumnName)).findFirst()
-                .ifPresent(fieldMapping -> expression.setColumnName(fieldMapping.getTarget().getName()));
-        //加入还是原名
-        ddlConfig.getRemoveFields().add(field);
-    }
-
-    /**
-     * 貌似不需要做什么,我们目前没有字段分索引,再考虑
-     * example: ALTER TABLE test_table drop index name;
-     *
-     * @param expression
-     * @param originalFieldMappings
-     */
-    private void dropIndex(AlterExpression expression, List<FieldMapping> originalFieldMappings) {
-//        Index index = expression.getIndex();
-//        String names= index.getName();
-//        String[] nameList = StringUtil.split(names,".");
-//        List<String> targetNameList = new LinkedList<>();
-//        for (String name:nameList) {
-//            FieldMapping fieldMapping = originalFieldMappings.stream()
-//                    .filter(x -> StringUtil.equals(x.getSource().getName(),
-//                            name)).findFirst().get();
-//            targetNameList.add(fieldMapping.getTarget().getName());
-//        }
-//        index.setName(targetNameList);
-    }
 }

+ 16 - 20
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ModifyStrategy.java

@@ -1,39 +1,35 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
 package org.dbsyncer.parser.ddl.alter;
 
 import net.sf.jsqlparser.statement.alter.AlterExpression;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.ddl.AlterStrategy;
-import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.sdk.config.DDLConfig;
 import org.dbsyncer.sdk.enums.DDLOperationEnum;
 
-import java.util.List;
-
 /**
- * 解析modify的属性
- * exampleSql: ALTER TABLE `test`.`test_table` MODIFY COLUMN `test` varchar(251) NULL DEFAULT NULL
- * alter modify parser
+ * 字段属性变更
+ * <code>
+ * ALTER TABLE `test`.`test_user`
+ * MODIFY COLUMN `name` varchar(203) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL AFTER `id`,
+ * MODIFY COLUMN `remark` varchar(204) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL AFTER `name`
+ * </code>
  *
  * @author life
  */
 public class ModifyStrategy implements AlterStrategy {
 
     @Override
-    public void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings) {
-        //先查找到当前的表和目标的表对应的字段
-        for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
-            String columnName = StringUtil.replace(columnDataType.getColumnName(), StringUtil.BACK_QUOTE, StringUtil.EMPTY);
-            columnName =StringUtil.replace(columnName,StringUtil.DOUBLE_QUOTATION,StringUtil.EMPTY);
-            for (FieldMapping fieldMapping : originalFieldMappings) {
-                if (StringUtil.equals(fieldMapping.getSource().getName(), columnName)) {
-                    //TODO life 找到目标的表名,先是alter进行属性替换,然后config记录新的
-                    columnDataType.setColumnName(fieldMapping.getTarget().getName());
-                    //因为只是修改属性,所以表名称没有变化
-                    ddlConfig.setSourceColumnName(fieldMapping.getSource().getName());
-                    ddlConfig.setChangedColumnName(fieldMapping.getSource().getName());
-                }
+    public void parse(AlterExpression expression, DDLConfig ddlConfig) {
+        if (expression.getColDataTypeList() != null) {
+            for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+                String columnName = StringUtil.replace(columnDataType.getColumnName(), StringUtil.BACK_QUOTE, StringUtil.EMPTY);
+                columnName = StringUtil.replace(columnName, StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
+                ddlConfig.getModifiedFieldNames().add(columnName);
             }
         }
         ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_MODIFY);
     }
-}
+}

+ 119 - 68
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/impl/DDLParserImpl.java

@@ -9,10 +9,9 @@ import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.alter.Alter;
 import net.sf.jsqlparser.statement.alter.AlterExpression;
 import net.sf.jsqlparser.statement.alter.AlterOperation;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.base.ConnectorFactory;
-import org.dbsyncer.sdk.config.DDLConfig;
-import org.dbsyncer.sdk.connector.database.Database;
 import org.dbsyncer.parser.ddl.AlterStrategy;
 import org.dbsyncer.parser.ddl.DDLParser;
 import org.dbsyncer.parser.ddl.alter.AddStrategy;
@@ -20,27 +19,26 @@ import org.dbsyncer.parser.ddl.alter.ChangeStrategy;
 import org.dbsyncer.parser.ddl.alter.DropStrategy;
 import org.dbsyncer.parser.ddl.alter.ModifyStrategy;
 import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.sdk.config.DDLConfig;
+import org.dbsyncer.sdk.connector.database.Database;
 import org.dbsyncer.sdk.enums.DDLOperationEnum;
 import org.dbsyncer.sdk.model.Field;
-import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.spi.ConnectorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
- * alter情况
- * <ol>
- *     <li>只是修改字段的属性值</li>
- *     <li>修改字段的名称</li>
- * </ol>
+ * ddl解析器, 支持类型参考:{@link DDLOperationEnum}
  *
  * @author life
  * @version 1.0.0
@@ -50,10 +48,6 @@ import java.util.concurrent.ConcurrentHashMap;
 public class DDLParserImpl implements DDLParser {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Resource
-    private ConnectorFactory connectorFactory;
-
     private final Map<AlterOperation, AlterStrategy> STRATEGIES = new ConcurrentHashMap();
 
     @PostConstruct
@@ -65,77 +59,134 @@ public class DDLParserImpl implements DDLParser {
     }
 
     @Override
-    public DDLConfig parseDDlConfig(String sql, String targetConnectorType, String targetTableName, List<FieldMapping> originalFieldMappings) {
-        ConnectorService connectorService = connectorFactory.getConnectorService(targetConnectorType);
-        // 替换为目标库执行SQL
+    public DDLConfig parse(ConnectorService connectorService, TableGroup tableGroup, String sql) throws JSQLParserException {
         DDLConfig ddlConfig = new DDLConfig();
-        try {
-            Statement statement = CCJSqlParserUtil.parse(sql);
-            if (statement instanceof Alter) {
-                Alter alter = (Alter) statement;
-                Database database = (Database) connectorService;
-                String quotation = database.buildSqlWithQuotation();
-                // 替换成目标表名
-                alter.getTable().setName(new StringBuilder(quotation).append(targetTableName).append(quotation).toString());
-                ddlConfig.setSql(alter.toString());
-                for (AlterExpression expression : alter.getAlterExpressions()) {
-                    if (STRATEGIES.containsKey(expression.getOperation())) {
-                        STRATEGIES.get(expression.getOperation()).parse(expression, ddlConfig, originalFieldMappings);
-                    }
-                }
-                ddlConfig.setSql(alter.toString());
+        logger.info("ddl:{}", sql);
+        Statement statement = CCJSqlParserUtil.parse(sql);
+        if (statement instanceof Alter && connectorService instanceof Database) {
+            Alter alter = (Alter) statement;
+            Database database = (Database) connectorService;
+            String quotation = database.buildSqlWithQuotation();
+            // 替换成目标表名
+            alter.getTable().setName(quotation + tableGroup.getTargetTable().getName() + quotation);
+            ddlConfig.setSql(alter.toString());
+            for (AlterExpression expression : alter.getAlterExpressions()) {
+                STRATEGIES.computeIfPresent(expression.getOperation(), (k, strategy) -> {
+                    strategy.parse(expression, ddlConfig);
+                    return strategy;
+                });
             }
-        } catch (JSQLParserException e) {
-            logger.error(e.getMessage(), e);
         }
         return ddlConfig;
     }
 
     @Override
-    public List<FieldMapping> refreshFiledMappings(List<FieldMapping> originalFieldMappings, MetaInfo originMetaInfo, MetaInfo targetMetaInfo, DDLConfig targetDDLConfig) {
-        List<FieldMapping> newTargetMappingList = new LinkedList<>();
-        //处理映射关系
-        for (FieldMapping fieldMapping : originalFieldMappings) {
-            String fieldSourceName = fieldMapping.getSource().getName();
-            String filedTargetName = fieldMapping.getTarget().getName();
-            //找到更改的源表的名称,也就是找到了对应的映射关系,这样就可以从源表找到更改后的名称进行对应,
-            if (fieldSourceName.equals(targetDDLConfig.getSourceColumnName())) {
-                // 说明字段名没有改变,只是改变了属性
-                if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_MODIFY) {
-                    Field source = originMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), fieldSourceName)).findFirst().get();
-                    Field target = targetMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), filedTargetName)).findFirst().get();
-                    //替换
-                    newTargetMappingList.add(new FieldMapping(source, target));
+    public void refreshFiledMappings(TableGroup tableGroup, DDLConfig targetDDLConfig) {
+        switch (targetDDLConfig.getDdlOperationEnum()) {
+            case ALTER_MODIFY:
+                updateFieldMapping(tableGroup, targetDDLConfig.getModifiedFieldNames());
+                break;
+            case ALTER_ADD:
+                appendFieldMappings(tableGroup, targetDDLConfig.getAddedFieldNames());
+                break;
+            case ALTER_CHANGE:
+                renameFieldMapping(tableGroup, targetDDLConfig.getChangedFieldNames());
+                break;
+            case ALTER_DROP:
+                removeFieldMappings(tableGroup, targetDDLConfig.getDroppedFieldNames());
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void updateFieldMapping(TableGroup tableGroup, List<String> modifiedFieldNames) {
+        Map<String, Field> sourceFiledMap = tableGroup.getSourceTable().getColumn().stream().collect(Collectors.toMap(Field::getName, filed -> filed));
+        Map<String, Field> targetFiledMap = tableGroup.getTargetTable().getColumn().stream().collect(Collectors.toMap(Field::getName, filed -> filed));
+        for (FieldMapping fieldMapping : tableGroup.getFieldMapping()) {
+            Field source = fieldMapping.getSource();
+            Field target = fieldMapping.getTarget();
+            // 支持1对多场景
+            if (source != null) {
+                String modifiedName = source.getName();
+                if (!modifiedFieldNames.contains(modifiedName)) {
                     continue;
-                } else if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_CHANGE) {
-                    Field source = originMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), targetDDLConfig.getChangedColumnName())).findFirst().get();
-                    Field target = targetMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), targetDDLConfig.getChangedColumnName())).findFirst().get();
-                    //替换
-                    newTargetMappingList.add(new FieldMapping(source, target));
+                }
+                sourceFiledMap.computeIfPresent(modifiedName, (k, field) -> {
+                    fieldMapping.setSource(field);
+                    return field;
+                });
+                if (target != null && StringUtil.equals(modifiedName, target.getName())) {
+                    targetFiledMap.computeIfPresent(modifiedName, (k, field) -> {
+                        fieldMapping.setTarget(field);
+                        return field;
+                    });
+                }
+            }
+        }
+    }
+
+    private void renameFieldMapping(TableGroup tableGroup, Map<String, String> changedFieldNames) {
+        Set<String> oldNames = changedFieldNames.keySet();
+        for (FieldMapping fieldMapping : tableGroup.getFieldMapping()) {
+            Field source = fieldMapping.getSource();
+            Field target = fieldMapping.getTarget();
+            // 支持1对多场景
+            if (source != null) {
+                String oldFieldName = source.getName();
+                if (!oldNames.contains(oldFieldName)) {
                     continue;
                 }
+                changedFieldNames.computeIfPresent(oldFieldName, (k, newName) -> {
+                    source.setName(newName);
+                    if (target != null && StringUtil.equals(oldFieldName, target.getName())) {
+                        target.setName(newName);
+                    }
+                    return newName;
+                });
             }
-            newTargetMappingList.add(fieldMapping);
         }
+    }
 
-        if (DDLOperationEnum.ALTER_ADD == targetDDLConfig.getDdlOperationEnum()) {
-            //处理新增的映射关系
-            List<Field> addFields = targetDDLConfig.getAddFields();
-            for (Field field : addFields) {
-                Field source = originMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), field.getName())).findFirst().get();
-                Field target = targetMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), field.getName())).findFirst().get();
-                newTargetMappingList.add(new FieldMapping(source, target));
+    private void removeFieldMappings(TableGroup tableGroup, List<String> removeFieldNames) {
+        Iterator<FieldMapping> iterator = tableGroup.getFieldMapping().iterator();
+        while (iterator.hasNext()) {
+            FieldMapping fieldMapping = iterator.next();
+            Field source = fieldMapping.getSource();
+            if (source != null && removeFieldNames.contains(source.getName())) {
+                iterator.remove();
             }
         }
+    }
 
-        if (DDLOperationEnum.ALTER_DROP == targetDDLConfig.getDdlOperationEnum()) {
-            //处理删除字段的映射关系
-            List<Field> removeFields = targetDDLConfig.getRemoveFields();
-            for (Field field : removeFields) {
-                newTargetMappingList.removeIf(x -> StringUtil.equals(x.getSource().getName(), field.getName()));
+    private void appendFieldMappings(TableGroup tableGroup, List<String> addedFieldNames) {
+        List<FieldMapping> fieldMappings = tableGroup.getFieldMapping();
+        Iterator<String> iterator = addedFieldNames.iterator();
+        while (iterator.hasNext()) {
+            String name = iterator.next();
+            for (FieldMapping fieldMapping : fieldMappings) {
+                Field source = fieldMapping.getSource();
+                Field target = fieldMapping.getTarget();
+                // 检查重复字段
+                if (source != null && target != null && StringUtil.equals(source.getName(), name) && StringUtil.equals(target.getName(), name)) {
+                    iterator.remove();
+                }
             }
         }
-        return newTargetMappingList;
+        if (CollectionUtils.isEmpty(addedFieldNames)) {
+            return;
+        }
+
+        Map<String, Field> sourceFiledMap = tableGroup.getSourceTable().getColumn().stream().collect(Collectors.toMap(Field::getName, filed -> filed));
+        Map<String, Field> targetFiledMap = tableGroup.getTargetTable().getColumn().stream().collect(Collectors.toMap(Field::getName, filed -> filed));
+        if (CollectionUtils.isEmpty(sourceFiledMap) || CollectionUtils.isEmpty(targetFiledMap)) {
+            return;
+        }
+        addedFieldNames.forEach(newFieldName -> {
+            if (sourceFiledMap.containsKey(newFieldName) && targetFiledMap.containsKey(newFieldName)) {
+                fieldMappings.add(new FieldMapping(sourceFiledMap.get(newFieldName), targetFiledMap.get(newFieldName)));
+            }
+        });
     }
 
 }

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -23,6 +23,7 @@ import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -179,7 +180,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     public void run() {
         boolean locked = false;
         try {
-            locked = taskLock.tryLock();
+            locked = taskLock.tryLock(3, TimeUnit.SECONDS);
             if (locked) {
                 submit();
             }

+ 5 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java

@@ -98,8 +98,11 @@ public final class BufferActuatorRouter implements DisposableBean {
         if (ChangedEventTypeEnum.isDDL(event.getType())) {
             WriterRequest request = new WriterRequest(event);
             // DDL事件,阻塞等待队列消费完成
-            while (actuator.getQueue().isEmpty() && actuator.isRunning(request)){
-                actuator.offer(request);
+            while (actuator.isRunning(request)) {
+                if (actuator.getQueue().isEmpty()) {
+                    actuator.offer(request);
+                    return;
+                }
                 try {
                     TimeUnit.MILLISECONDS.sleep(10);
                 } catch (InterruptedException ex) {

+ 35 - 35
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * 通用执行器(单线程消费,多线程批量写,按序执行)
@@ -139,15 +140,16 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
         switch (response.getTypeEnum()) {
             case DDL:
-                // ddl解析
-                pickers.forEach(picker -> parseDDl(response, mapping, picker.getTableGroup()));
+                tableGroupContext.update(mapping, pickers.stream().map(picker -> {
+                    TableGroup tableGroup = profileComponent.getTableGroup(picker.getTableGroup().getId());
+                    parseDDl(response, mapping, tableGroup);
+                    return tableGroup;
+                }).collect(Collectors.toList()));
                 break;
             case SCAN:
-                // dml解析
                 pickers.forEach(picker -> distributeTableGroup(response, mapping, picker, picker.getSourceFields(), false));
                 break;
             case ROW:
-                // dml解析
                 pickers.forEach(picker -> distributeTableGroup(response, mapping, picker, picker.getTableGroup().getSourceTable().getColumn(), true));
                 // 发布刷新增量点事件
                 applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
@@ -229,42 +231,40 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
             String sConnType = sConnConfig.getConnectorType();
             String tConnType = tConnConfig.getConnectorType();
             // 0.生成目标表执行SQL(暂支持同源)
-            if (StringUtil.equals(sConnType, tConnType)) {
-                // 1.转换为目标SQL,执行到目标库
-                String targetTableName = tableGroup.getTargetTable().getName();
-                List<FieldMapping> originalFieldMappings = tableGroup.getFieldMapping();
-                DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName, originalFieldMappings);
-                ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
-                Result result = connectorFactory.writerDDL(tConnectorInstance, targetDDLConfig);
-                result.setTableGroupId(tableGroup.getId());
-                result.setTargetTableGroupName(targetTableName);
-
-                // 2.获取目标表最新的属性字段
-                MetaInfo targetMetaInfo = parserComponent.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
-                MetaInfo originMetaInfo = parserComponent.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
-
-                // 3.更新表字段映射(根据保留的更改的属性,进行更改)
-                tableGroup.getSourceTable().setColumn(originMetaInfo.getColumn());
-                tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
-                tableGroup.setFieldMapping(ddlParser.refreshFiledMappings(originalFieldMappings, originMetaInfo, targetMetaInfo, targetDDLConfig));
-
-                // 4.更新执行命令
-                Map<String, String> commands = parserComponent.getCommand(mapping, tableGroup);
-                tableGroup.setCommand(commands);
-
-                // 5.持久化存储 & 更新缓存配置
-                profileComponent.editTableGroup(tableGroup);
-
-                // 6.发布更新事件,持久化增量数据
-                applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
-                flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
+            if (!StringUtil.equals(sConnType, tConnType)) {
+                logger.warn("暂只支持数据库同源并且是关系性解析DDL");
                 return;
             }
+            // 1.转换为目标SQL,执行到目标库
+            String targetTableName = tableGroup.getTargetTable().getName();
+            ConnectorService connectorService = connectorFactory.getConnectorService(tConnType);
+            DDLConfig targetDDLConfig = ddlParser.parse(connectorService, tableGroup, response.getSql());
+            ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
+            Result result = connectorFactory.writerDDL(tConnectorInstance, targetDDLConfig);
+            result.setTableGroupId(tableGroup.getId());
+            result.setTargetTableGroupName(targetTableName);
+
+            // 2.获取目标表最新的属性字段
+            MetaInfo sourceMetaInfo = parserComponent.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
+            MetaInfo targetMetaInfo = parserComponent.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
+
+            // 3.更新表字段映射(根据保留的更改的属性,进行更改)
+            tableGroup.getSourceTable().setColumn(sourceMetaInfo.getColumn());
+            tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
+            ddlParser.refreshFiledMappings(tableGroup, targetDDLConfig);
+
+            // 4.更新执行命令
+            tableGroup.setCommand(parserComponent.getCommand(mapping, tableGroup));
+
+            // 5.持久化存储 & 更新缓存配置
+            profileComponent.editTableGroup(tableGroup);
+
+            // 6.发布更新事件,持久化增量数据
+            applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
+            flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
-            return;
         }
-        logger.warn("暂只支持数据库同源并且是关系性解析DDL");
     }
 
     /**

+ 17 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/TableGroupContextImpl.java

@@ -40,6 +40,19 @@ public final class TableGroupContextImpl implements TableGroupContext {
         });
     }
 
+    @Override
+    public void update(Mapping mapping, List<TableGroup> tableGroups) {
+        tableGroupMap.computeIfPresent(mapping.getMetaId(), (k, innerMap) -> {
+            // 先清空表映射关系,再更新表映射关系
+            tableGroups.stream().findFirst().ifPresent(tableGroup -> innerMap.remove(tableGroup.getSourceTable().getName()));
+            tableGroups.forEach(tableGroup -> {
+                String sourceTableName = tableGroup.getSourceTable().getName();
+                innerMap.add(sourceTableName, PickerUtil.mergeTableGroupConfig(mapping, tableGroup));
+            });
+            return innerMap;
+        });
+    }
+
     @Override
     public List<TableGroupPicker> getTableGroupPickers(String metaId, String tableName) {
         List<TableGroupPicker> list = new ArrayList<>();
@@ -64,5 +77,9 @@ public final class TableGroupContextImpl implements TableGroupContext {
         public void add(String tableName, TableGroup tableGroup) {
             pickerMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(new TableGroupPicker(tableGroup));
         }
+
+        public void remove(String tableName) {
+            pickerMap.remove(tableName);
+        }
     }
 }

+ 27 - 30
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/DDLConfig.java

@@ -1,28 +1,25 @@
 package org.dbsyncer.sdk.config;
 
 import org.dbsyncer.sdk.enums.DDLOperationEnum;
-import org.dbsyncer.sdk.model.Field;
 
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 public class DDLConfig {
-    /**
-     * 执行命令
-     */
+
     private String sql;
 
     private DDLOperationEnum ddlOperationEnum;
 
-    private List<Field> addFields = new LinkedList<>();
+    private List<String> addedFieldNames = new LinkedList<>();
 
-    private List<Field> removeFields = new LinkedList<>();
+    private List<String> modifiedFieldNames = new LinkedList<>();
 
-    //记录源表的源字段名称
-    private String sourceColumnName;
+    private List<String> droppedFieldNames = new LinkedList<>();
 
-    //记录改变后的字段名称
-    private String changedColumnName;
+    private Map<String, String> changedFieldNames = new LinkedHashMap<>();
 
     public String getSql() {
         return sql;
@@ -32,43 +29,43 @@ public class DDLConfig {
         this.sql = sql;
     }
 
-    public List<Field> getAddFields() {
-        return addFields;
+    public DDLOperationEnum getDdlOperationEnum() {
+        return ddlOperationEnum;
     }
 
-    public void setAddFields(List<Field> addFields) {
-        this.addFields = addFields;
+    public void setDdlOperationEnum(DDLOperationEnum ddlOperationEnum) {
+        this.ddlOperationEnum = ddlOperationEnum;
     }
 
-    public List<Field> getRemoveFields() {
-        return removeFields;
+    public List<String> getAddedFieldNames() {
+        return addedFieldNames;
     }
 
-    public void setRemoveFields(List<Field> removeFields) {
-        this.removeFields = removeFields;
+    public void setAddedFieldNames(List<String> addedFieldNames) {
+        this.addedFieldNames = addedFieldNames;
     }
 
-    public String getSourceColumnName() {
-        return sourceColumnName;
+    public List<String> getModifiedFieldNames() {
+        return modifiedFieldNames;
     }
 
-    public void setSourceColumnName(String sourceColumnName) {
-        this.sourceColumnName = sourceColumnName;
+    public void setModifiedFieldNames(List<String> modifiedFieldNames) {
+        this.modifiedFieldNames = modifiedFieldNames;
     }
 
-    public String getChangedColumnName() {
-        return changedColumnName;
+    public List<String> getDroppedFieldNames() {
+        return droppedFieldNames;
     }
 
-    public void setChangedColumnName(String changedColumnName) {
-        this.changedColumnName = changedColumnName;
+    public void setDroppedFieldNames(List<String> droppedFieldNames) {
+        this.droppedFieldNames = droppedFieldNames;
     }
 
-    public DDLOperationEnum getDdlOperationEnum() {
-        return ddlOperationEnum;
+    public Map<String, String> getChangedFieldNames() {
+        return changedFieldNames;
     }
 
-    public void setDdlOperationEnum(DDLOperationEnum ddlOperationEnum) {
-        this.ddlOperationEnum = ddlOperationEnum;
+    public void setChangedFieldNames(Map<String, String> changedFieldNames) {
+        this.changedFieldNames = changedFieldNames;
     }
 }

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/ListenerConfig.java

@@ -58,7 +58,7 @@ public class ListenerConfig {
     /**
      * 禁用ddl事件
      */
-    private boolean enableDDL = true;
+    private boolean enableDDL;
 
     public ListenerConfig() {
     }

+ 2 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -11,6 +11,7 @@ import org.dbsyncer.sdk.config.*;
 import org.dbsyncer.sdk.connector.AbstractConnector;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.connector.database.ds.SimpleConnection;
+import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.constant.DatabaseConstant;
 import org.dbsyncer.sdk.enums.OperationEnum;
@@ -661,7 +662,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                 return true;
             });
             Map<String, String> successMap = new HashMap<>();
-            successMap.put("sql", config.getSql());
+            successMap.put(ConfigConstant.BINLOG_DATA, config.getSql());
             result.addSuccessData(Collections.singletonList(successMap));
         } catch (Exception e) {
             result.getError().append(String.format("执行ddl: %s, 异常:%s", config.getSql(), e.getMessage()));

+ 14 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/DDLOperationEnum.java

@@ -8,8 +8,20 @@ package org.dbsyncer.sdk.enums;
  * @Date 2023-09-24 14:24
  */
 public enum DDLOperationEnum {
+    /**
+     * 变更字段属性
+     */
     ALTER_MODIFY,
+    /**
+     * 新增字段
+     */
     ALTER_ADD,
-    ALTER_DROP,
-    ALTER_CHANGE;
+    /**
+     * 变更字段名称
+     */
+    ALTER_CHANGE,
+    /**
+     * 删除字段
+     */
+    ALTER_DROP;
 }

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractQuartzListener.java

@@ -87,7 +87,7 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
         final Lock taskLock = lock;
         boolean locked = false;
         try {
-            locked = taskLock.tryLock();
+            locked = taskLock.tryLock(3, TimeUnit.SECONDS);
             if (locked) {
                 for (int i = 0; i < commands.size(); i++) {
                     execute(commands.get(i), i);

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/storage/AbstractStorageService.java

@@ -66,7 +66,7 @@ public abstract class AbstractStorageService implements StorageService, Disposab
                 return select(sharding, query);
             }
         } catch (InterruptedException e) {
-            logger.warn("tryLock error", e.getLocalizedMessage());
+            logger.warn("tryLock error:{}", e.getLocalizedMessage());
         } catch (NullExecutorException e) {
             // 存储表不存在或已删除,请重试
         } finally {