Преглед на файлове

!175 merge
Merge pull request !175 from AE86/v_1.2.5_dev_life

AE86 преди 1 година
родител
ревизия
238ff3c8e7
променени са 29 файла, в които са добавени 809 реда и са изтрити 78 реда
  1. 5 0
      dbsyncer-common/pom.xml
  2. 3 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/CommonChangedEvent.java
  3. 13 12
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/DDLChangedEvent.java
  4. 0 8
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java
  5. 12 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java
  6. 10 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  7. 74 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DDLConfig.java
  8. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java
  9. 15 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  10. 16 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/DDLOperationEnum.java
  11. 5 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java
  12. 1 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java
  13. 4 6
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  14. 0 8
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  15. 34 10
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MySQLExtractor.java
  16. 1 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java
  17. 27 14
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  18. 26 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/AlterStrategy.java
  19. 31 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/DDLParser.java
  20. 92 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/AddStrategy.java
  21. 33 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ChangeStrategy.java
  22. 71 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/DropStrategy.java
  23. 38 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ModifyStrategy.java
  24. 137 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/impl/DDLParserImpl.java
  25. 123 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  26. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java
  27. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java
  28. 6 7
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  29. 7 0
      pom.xml

+ 5 - 0
dbsyncer-common/pom.xml

@@ -57,6 +57,11 @@
             <artifactId>fastjson2</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.jsqlparser</groupId>
+            <artifactId>jsqlparser</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>

+ 3 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/CommonChangedEvent.java

@@ -31,6 +31,7 @@ public class CommonChangedEvent implements ChangedEvent {
      */
     private ChangedOffset changedOffset = new ChangedOffset();
 
+    @Override
     public String getSourceTableName() {
         return sourceTableName;
     }
@@ -39,6 +40,7 @@ public class CommonChangedEvent implements ChangedEvent {
         this.sourceTableName = sourceTableName;
     }
 
+    @Override
     public String getEvent() {
         return event;
     }
@@ -47,6 +49,7 @@ public class CommonChangedEvent implements ChangedEvent {
         this.event = event;
     }
 
+    @Override
     public Map<String, Object> getChangedRow() {
         return changedRow;
     }

+ 13 - 12
dbsyncer-common/src/main/java/org/dbsyncer/common/event/DDLChangedEvent.java

@@ -1,25 +1,30 @@
 package org.dbsyncer.common.event;
 
-public class DDLChangedEvent {
+/**
+ * DDL变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-09-18 23:00
+ */
+public class DDLChangedEvent extends CommonChangedEvent {
 
     /**
      * 变更数据库
       */
     private String database;
 
-    /**
-     * 变更表名称
-     */
-    private String tableName;
-
     /**
      * 变更SQL
      */
     private String sql;
 
-    public DDLChangedEvent(String database, String tableName, String sql) {
+    public DDLChangedEvent(String database, String sourceTableName, String event, String sql, String nextFileName, Object position) {
+        setSourceTableName(sourceTableName);
+        setEvent(event);
+        setNextFileName(nextFileName);
+        setPosition(position);
         this.database = database;
-        this.tableName = tableName;
         this.sql = sql;
     }
 
@@ -27,10 +32,6 @@ public class DDLChangedEvent {
         return database;
     }
 
-    public String getTableName() {
-        return tableName;
-    }
-
     public String getSql() {
         return sql;
     }

+ 0 - 8
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -18,14 +18,6 @@ public interface Watcher {
      */
     void changeEvent(ChangedEvent event);
 
-    /**
-     * DDL变更事件
-     *
-     * @param event
-     */
-    default void changeEvent(DDLChangedEvent event) {
-    }
-
     /**
      * 持久化增量点事件
      *

+ 12 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -3,6 +3,7 @@ package org.dbsyncer.connector;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.model.MetaInfo;
@@ -95,6 +96,17 @@ public interface Connector<M, C> {
      */
     Result writer(M connectorMapper, WriterBatchConfig config);
 
+    /**
+     * 执行DDL命令
+     *
+     * @param connectorMapper
+     * @param ddlConfig
+     * @return
+     */
+    default Result writerDDL(M connectorMapper, DDLConfig ddlConfig){
+        throw new ConnectorException("Unsupported method.");
+    }
+
     /**
      * 获取数据源同步参数
      *

+ 10 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -171,6 +172,13 @@ public class ConnectorFactory implements DisposableBean {
         return result;
     }
 
+    public Result writerDDL(ConnectorMapper connectorMapper, DDLConfig ddlConfig) {
+        Connector connector = getConnector(connectorMapper);
+        Result result = connector.writerDDL(connectorMapper, ddlConfig);
+        Assert.notNull(result, "Connector writer batch result can not null");
+        return result;
+    }
+
     public Connector getConnector(ConnectorMapper connectorMapper) {
         return getConnector(connectorMapper.getConnectorType());
     }
@@ -181,7 +189,7 @@ public class ConnectorFactory implements DisposableBean {
      * @param connectorType
      * @return
      */
-    private Connector getConnector(String connectorType) {
+    public Connector getConnector(String connectorType) {
         return ConnectorEnum.getConnectorEnum(connectorType).getConnector();
     }
 
@@ -194,5 +202,4 @@ public class ConnectorFactory implements DisposableBean {
         Assert.notNull(connectorMapper, "ConnectorMapper can not be null.");
         getConnector(connectorMapper).disconnect(connectorMapper);
     }
-
-}
+}

+ 74 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DDLConfig.java

@@ -0,0 +1,74 @@
+package org.dbsyncer.connector.config;
+
+import org.dbsyncer.connector.enums.DDLOperationEnum;
+import org.dbsyncer.connector.model.Field;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class DDLConfig {
+    /**
+     * 执行命令
+     */
+    private String sql;
+
+    private DDLOperationEnum ddlOperationEnum;
+
+    private List<Field> addFields = new LinkedList<>();
+
+    private List<Field> removeFields = new LinkedList<>();
+
+    //记录源表的源字段名称
+    private String sourceColumnName;
+
+    //记录改变后的字段名称
+    private String changedColumnName;
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public List<Field> getAddFields() {
+        return addFields;
+    }
+
+    public void setAddFields(List<Field> addFields) {
+        this.addFields = addFields;
+    }
+
+    public List<Field> getRemoveFields() {
+        return removeFields;
+    }
+
+    public void setRemoveFields(List<Field> removeFields) {
+        this.removeFields = removeFields;
+    }
+
+    public String getSourceColumnName() {
+        return sourceColumnName;
+    }
+
+    public void setSourceColumnName(String sourceColumnName) {
+        this.sourceColumnName = sourceColumnName;
+    }
+
+    public String getChangedColumnName() {
+        return changedColumnName;
+    }
+
+    public void setChangedColumnName(String changedColumnName) {
+        this.changedColumnName = changedColumnName;
+    }
+
+    public DDLOperationEnum getDdlOperationEnum() {
+        return ddlOperationEnum;
+    }
+
+    public void setDdlOperationEnum(DDLOperationEnum ddlOperationEnum) {
+        this.ddlOperationEnum = ddlOperationEnum;
+    }
+}

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java

@@ -22,6 +22,11 @@ public class ConnectorConstant {
      */
     public static final String OPERTION_DELETE = "DELETE";
 
+    /**
+     * 表结构更改
+     */
+    public static final String OPERTION_ALTER = "ALTER";
+
     /**
      * 查询
      */

+ 15 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -8,6 +8,7 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
@@ -639,4 +640,18 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         });
     }
 
+    @Override
+    public Result writerDDL(DatabaseConnectorMapper connectorMapper, DDLConfig config) {
+        Result result = new Result();
+        try {
+            Assert.hasText(config.getSql(), "执行SQL语句不能为空.");
+            connectorMapper.execute(databaseTemplate -> {
+                databaseTemplate.execute(config.getSql());
+                return true;
+            });
+        } catch (Exception e) {
+            result.getError().append(String.format("执行ddl: %s, 异常:%s", config.getSql(), e.getMessage()));
+        }
+        return result;
+    }
 }

+ 16 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/DDLOperationEnum.java

@@ -0,0 +1,16 @@
+package org.dbsyncer.connector.enums;
+
+
+/**
+ * 支持同步的DDL命令
+ *
+ * @version 1.0.0
+ * @Author life
+ * @Date 2023-09-24 14:24
+ */
+public enum DDLOperationEnum {
+    ALTER_MODIFY,
+    ALTER_ADD,
+    ALTER_DROP,
+    ALTER_CHANGE;
+}

+ 5 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -21,7 +21,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;

+ 1 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -110,4 +110,5 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
         return Collections.EMPTY_MAP;
     }
+
 }

+ 4 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -2,7 +2,6 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
-import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
@@ -61,17 +60,16 @@ public abstract class AbstractExtractor implements Extractor {
                     // 是否支持监听删除事件
                     processEvent(!listenerConfig.isBanDelete(), event);
                     break;
+                case ConnectorConstant.OPERTION_ALTER:
+                    // 表结构变更事件
+                    watcher.changeEvent(event);
+                    break;
                 default:
                     break;
             }
         }
     }
 
-    @Override
-    public void changeEvent(DDLChangedEvent event) {
-        watcher.changeEvent(event);
-    }
-
     @Override
     public void refreshEvent(ChangedOffset offset) {
         // nothing to do

+ 0 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -2,7 +2,6 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
-import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 
 public interface Extractor {
@@ -31,13 +30,6 @@ public interface Extractor {
      */
     void changeEvent(ChangedEvent event);
 
-    /**
-     * DDL变更事件
-     *
-     * @param event
-     */
-    void changeEvent(DDLChangedEvent event);
-
     /**
      * 更新增量点
      *

+ 34 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MySQLExtractor.java

@@ -11,6 +11,10 @@ import com.github.shyiko.mysql.binlog.event.TableMapEventData;
 import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
 import com.github.shyiko.mysql.binlog.network.ServerException;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.alter.Alter;
+import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
@@ -47,10 +51,10 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final String BINLOG_FILENAME = "fileName";
-    private static final String BINLOG_POSITION = "position";
-    private static final int RETRY_TIMES = 10;
-    private static final int MASTER = 0;
+    private final String BINLOG_FILENAME = "fileName";
+    private final String BINLOG_POSITION = "position";
+    private final int RETRY_TIMES = 10;
+    private final int MASTER = 0;
     private Map<Long, TableMapEventData> tables = new HashMap<>();
     private BinaryLogClient client;
     private List<Host> cluster;
@@ -64,7 +68,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
         try {
             connectLock.lock();
             if (connected) {
-                logger.error("MysqlExtractor is already started");
+                logger.error("MySQLExtractor is already started");
                 return;
             }
             run();
@@ -104,13 +108,14 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
         }
         database = DatabaseUtil.getDatabaseName(config.getUrl());
         cluster = readNodes(config.getUrl());
-        Assert.notEmpty(cluster, "Mysql连接地址有误.");
+        Assert.notEmpty(cluster, "MySQL连接地址有误.");
 
         final Host host = cluster.get(MASTER);
         final String username = config.getUsername();
         final String password = config.getPassword();
         boolean containsPos = snapshot.containsKey(BINLOG_POSITION);
         client = new BinaryLogRemoteClient(host.getIp(), host.getPort(), username, password);
+        client.setEnableDDL(true);
         client.setBinlogFilename(snapshot.get(BINLOG_FILENAME));
         client.setBinlogPosition(containsPos ? Long.parseLong(snapshot.get(BINLOG_POSITION)) : 0);
         client.setTableMapEventByTableId(tables);
@@ -293,9 +298,8 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
 
             if (client.isEnableDDL() && EventType.QUERY == header.getEventType()) {
                 refresh(header);
-                QueryEventData data = event.getData();
-                changeEvent(new DDLChangedEvent(data.getDatabase(), "", data.getSql()));
-                logger.info("database:{}, sql:{}", data.getDatabase(), data.getSql());
+                parseDDL(event.getData());
+                return;
             }
 
             // 切换binlog
@@ -305,13 +309,33 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
             }
         }
 
+        private void parseDDL(QueryEventData data) {
+            if (StringUtil.startsWith(data.getSql(), ConnectorConstant.OPERTION_ALTER)) {
+                try {
+                    // ALTER TABLE `test`.`my_user` MODIFY COLUMN `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL AFTER `id`
+                    Alter alter = (Alter) CCJSqlParserUtil.parse(data.getSql());
+                    String tableName = StringUtil.replace(alter.getTable().getName(),"`","");
+                    if (isFilterTable(data.getDatabase(), tableName)) {
+                        logger.info("sql:{}", data.getSql());
+                        changeEvent(new DDLChangedEvent(data.getDatabase(), tableName, ConnectorConstant.OPERTION_ALTER, data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
+                    }
+                } catch (JSQLParserException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+
         private String getTableName(long tableId) {
             return tables.get(tableId).getTable();
         }
 
         private boolean isFilterTable(long tableId) {
             final TableMapEventData tableMap = tables.get(tableId);
-            return StringUtil.equalsIgnoreCase(database, tableMap.getDatabase()) && filterTable.contains(tableMap.getTable());
+            return isFilterTable(tableMap.getDatabase(), tableMap.getTable());
+        }
+
+        private boolean isFilterTable(String dbName, String tableName) {
+            return StringUtil.equalsIgnoreCase(database, dbName) && filterTable.contains(tableName);
         }
 
     }

+ 1 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -1,9 +1,9 @@
 package org.dbsyncer.listener.postgresql.decoder;
 
+import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
-import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
@@ -12,7 +12,6 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 /**

+ 27 - 14
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -2,6 +2,7 @@ package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.CommonChangedEvent;
 import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
@@ -12,7 +13,6 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -199,9 +200,16 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
         public abstract void onChange(E e);
 
+        public void onDDLChanged(DDLChangedEvent event) {
+        }
+
         @Override
         public void changeEvent(ChangedEvent event) {
             event.getChangedOffset().setMetaId(meta.getId());
+            if (event instanceof DDLChangedEvent) {
+                onDDLChanged((DDLChangedEvent) event);
+                return;
+            }
             onChange((E) event);
         }
 
@@ -221,11 +229,11 @@ public final class IncrementPuller extends AbstractPuller implements Application
             return meta.getUpdateTime();
         }
 
-        protected void bind(String tableGroupId){
+        protected void bind(String tableGroupId) {
             bufferActuatorRouter.bind(meta.getId(), tableGroupId);
         }
 
-        protected void execute(String tableGroupId, E event){
+        protected void execute(String tableGroupId, ChangedEvent event) {
             bufferActuatorRouter.execute(meta.getId(), tableGroupId, event);
         }
     }
@@ -271,25 +279,30 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
         @Override
         public void onChange(RowChangedEvent event) {
+            process(event, picker -> {
+                final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
+                if (picker.filter(changedRow)) {
+                    event.setChangedRow(changedRow);
+                    execute(picker.getTableGroup().getId(), event);
+                }
+            });
+        }
+
+        @Override
+        public void onDDLChanged(DDLChangedEvent event) {
+            process(event, picker -> execute(picker.getTableGroup().getId(), event));
+        }
+
+        private void process(CommonChangedEvent event, Consumer<FieldPicker> consumer) {
             // 处理过程有异常向上抛
             List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
                 // 触发刷新增量点事件
                 event.getChangedOffset().setRefreshOffset(true);
-                pickers.forEach(picker -> {
-                    final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
-                    if (picker.filter(changedRow)) {
-                        event.setChangedRow(changedRow);
-                        execute(picker.getTableGroup().getId(), event);
-                    }
-                });
+                pickers.forEach(picker -> consumer.accept(picker));
             }
         }
 
-        @Override
-        public void changeEvent(DDLChangedEvent event) {
-            // TODO 解析ddl
-        }
     }
 
 }

+ 26 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/AlterStrategy.java

@@ -0,0 +1,26 @@
+package org.dbsyncer.parser.ddl;
+
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.parser.model.FieldMapping;
+
+import java.util.List;
+
+/**
+ * Alter策略
+ *
+ * @version 1.0.0
+ * @Author life
+ * @Date 2023-09-24 14:24
+ */
+public interface AlterStrategy {
+
+    /**
+     * 解析DDLConfig
+     *
+     * @param expression
+     * @param ddlConfig
+     * @param fieldMappingList
+     */
+    void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> fieldMappingList);
+}

+ 31 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/DDLParser.java

@@ -0,0 +1,31 @@
+package org.dbsyncer.parser.ddl;
+
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.parser.model.FieldMapping;
+
+import java.util.List;
+
+public interface DDLParser {
+
+    /**
+     * 解析DDL配置
+     *
+     * @param sql                   源表ALTER语句
+     * @param targetConnectorType   目标连接器类型
+     * @param targetTableName       目标表
+     * @param originalFieldMappings 字段映射关系
+     * @return
+     */
+    DDLConfig parseDDlConfig(String sql, String targetConnectorType, String targetTableName, List<FieldMapping> originalFieldMappings);
+
+    /**
+     * 刷新字段映射关系(根据原来的映射关系和更改的字段进行新关系的映射组合)
+     *
+     * @param originalFieldMappings
+     * @param originMetaInfo
+     * @param targetMetaInfo
+     * @param targetDDLConfig
+     */
+    List<FieldMapping> refreshFiledMappings(List<FieldMapping> originalFieldMappings, MetaInfo originMetaInfo, MetaInfo targetMetaInfo, DDLConfig targetDDLConfig);
+}

+ 92 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/AddStrategy.java

@@ -0,0 +1,92 @@
+package org.dbsyncer.parser.ddl.alter;
+
+import java.util.LinkedList;
+import java.util.List;
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import net.sf.jsqlparser.statement.create.table.Index;
+import net.sf.jsqlparser.statement.create.table.Index.ColumnParams;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.parser.ddl.AlterStrategy;
+import org.dbsyncer.parser.model.FieldMapping;
+
+/**
+ * 解析add的属性 exampleSql: ALTER TABLE cost ADD duan INT after(before) `tmp`;
+ *
+ * @author life
+ */
+public class AddStrategy implements AlterStrategy {
+
+    @Override
+    public void parse(AlterExpression expression, DDLConfig ddlConfig,
+            List<FieldMapping> originFiledMapping) {
+        if (expression.getColDataTypeList() != null) {
+            parseAddColumn(expression, ddlConfig, originFiledMapping);
+        }
+        if (expression.getIndex() != null) {
+            parseAddIndex(expression, originFiledMapping);
+        }
+        ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_ADD);
+    }
+
+    //解析增加列
+    //exampleSql: ALTER TABLE cost ADD duan INT after(before) `tmp`;
+    private void parseAddColumn(AlterExpression expression, DDLConfig ddlConfig,
+            List<FieldMapping> originFiledMapping) {
+        //如果是增加列
+        for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+            boolean findColumn = false;
+            List<String> columnSpecs = new LinkedList<>();
+            for (String spe : columnDataType.getColumnSpecs()) {//对一before,after进行处理
+                spe = StringUtil.replace(spe, "`", "");
+                if (findColumn) {
+                    //对before(after)字段进行映射
+                    String finalSpe = spe;
+                    FieldMapping fieldMapping = originFiledMapping.stream()
+                            .filter(x -> StringUtil.equals(x.getSource().getName(), finalSpe))
+                            .findFirst().get();
+                    columnSpecs.add(fieldMapping.getTarget().getName());
+                    findColumn = false;
+                    continue;
+                }
+
+                if (StringUtil.equalsIgnoreCase(spe, "before") || StringUtil.equalsIgnoreCase(spe,
+                        "after")) {
+                    findColumn = true;
+                }
+                columnSpecs.add(spe);
+            }
+            columnDataType.setColumnSpecs(columnSpecs);
+            String columName = columnDataType.getColumnName();
+            columName = StringUtil.replace(columName, "`", "");
+            Field field = new Field(columName, columnDataType.getColDataType().getDataType(),
+                    0);//感觉不需要都行,只需要名称,后续可以自己刷新
+            ddlConfig.getAddFields().add(field);
+        }
+
+    }
+
+    /**
+     * 新增索引 exampleSql: ALTER TABLE test_table add index name (tmp);
+     *
+     * @param expression
+     * @param originFiledMapping
+     */
+    private void parseAddIndex(AlterExpression expression,
+            List<FieldMapping> originFiledMapping) {
+        Index index = expression.getIndex();
+        List<ColumnParams> columnNames = index.getColumns();
+        List<ColumnParams> targetNames = new LinkedList<>();
+        for (ColumnParams columnParams : columnNames) {
+            FieldMapping fieldMapping = originFiledMapping.stream()
+                    .filter(x -> StringUtil.equals(x.getSource().getName(),
+                            columnParams.getColumnName())).findFirst().get();
+            ColumnParams target = new ColumnParams(fieldMapping.getTarget().getName(),
+                    columnParams.getParams());
+            targetNames.add(target);
+        }
+        index.setColumns(targetNames);
+    }
+}

+ 33 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ChangeStrategy.java

@@ -0,0 +1,33 @@
+package org.dbsyncer.parser.ddl.alter;
+
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
+import org.dbsyncer.parser.ddl.AlterStrategy;
+import org.dbsyncer.parser.model.FieldMapping;
+
+import java.util.List;
+
+/**
+ * 解析change属性
+ * exampleSql: ALTER TABLE test_table CHANGE duan1  duan2 INT(10)
+ *
+ * @author life
+ */
+public class ChangeStrategy implements AlterStrategy {
+
+    @Override
+    public void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings) {
+        String oldColumnName = StringUtil.replace(expression.getColumnOldName(), "`", "");
+        ddlConfig.setSourceColumnName(oldColumnName);
+        FieldMapping fieldMapping = originalFieldMappings.stream().filter(x -> StringUtil.equals(x.getSource().getName(), oldColumnName)).findFirst().orElse(null);
+        if (fieldMapping != null) {
+            expression.setColumnOldName(fieldMapping.getTarget().getName());
+            for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+                ddlConfig.setChangedColumnName(columnDataType.getColumnName());
+            }
+        }
+        ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_CHANGE);
+    }
+}

+ 71 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/DropStrategy.java

@@ -0,0 +1,71 @@
+package org.dbsyncer.parser.ddl.alter;
+
+import java.util.LinkedList;
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import net.sf.jsqlparser.statement.create.table.Index;
+import net.sf.jsqlparser.statement.create.table.Index.ColumnParams;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.parser.ddl.AlterStrategy;
+import org.dbsyncer.parser.model.FieldMapping;
+
+import java.util.List;
+
+/**
+ * 解析drop
+
+ *
+ * @author life
+ */
+public class DropStrategy implements AlterStrategy {
+
+    @Override
+    public void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings) {
+        if (expression.getColumnName() !=null){
+           dropColumn(expression,ddlConfig,originalFieldMappings);
+        }
+        if (expression.getIndex() !=null){
+            dropIndex(expression,originalFieldMappings);
+        }
+        ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_DROP);
+    }
+
+    /**
+     *  example: ALTER TABLE test_table DROP dis;
+     * @param expression
+     * @param ddlConfig
+     * @param originalFieldMappings
+     */
+    private void dropColumn(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings){
+        String columnName = StringUtil.replace(expression.getColumnName(), "`", "");
+        Field field = new Field(columnName, null, 0);
+        //需要把列替换成目标的列名
+        originalFieldMappings.stream()
+                .filter(x -> StringUtil.equals(x.getSource().getName(), columnName)).findFirst()
+                .ifPresent(fieldMapping -> expression.setColumnName(fieldMapping.getTarget().getName()));
+        //加入还是原名
+        ddlConfig.getRemoveFields().add(field);
+    }
+
+    /**
+     * 貌似不需要做什么,我们目前没有字段分索引,再考虑
+     *  * example: ALTER TABLE test_table drop index name;
+     * @param expression
+     * @param originalFieldMappings
+     */
+    private void dropIndex(AlterExpression expression, List<FieldMapping> originalFieldMappings){
+//        Index index = expression.getIndex();
+//        String names= index.getName();
+//        String[] nameList = StringUtil.split(names,".");
+//        List<String> targetNameList = new LinkedList<>();
+//        for (String name:nameList) {
+//            FieldMapping fieldMapping = originalFieldMappings.stream()
+//                    .filter(x -> StringUtil.equals(x.getSource().getName(),
+//                            name)).findFirst().get();
+//            targetNameList.add(fieldMapping.getTarget().getName());
+//        }
+//        index.setName(targetNameList);
+    }
+}

+ 38 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/ModifyStrategy.java

@@ -0,0 +1,38 @@
+package org.dbsyncer.parser.ddl.alter;
+
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
+import org.dbsyncer.parser.ddl.AlterStrategy;
+import org.dbsyncer.parser.model.FieldMapping;
+
+import java.util.List;
+
+/**
+ * 解析modify的属性
+ * exampleSql: ALTER TABLE `test`.`test_table` MODIFY COLUMN `test` varchar(251) NULL DEFAULT NULL
+ * alter modify parser
+ *
+ * @author life
+ */
+public class ModifyStrategy implements AlterStrategy {
+
+    @Override
+    public void parse(AlterExpression expression, DDLConfig ddlConfig, List<FieldMapping> originalFieldMappings) {
+        //先查找到当前的表和目标的表对应的字段
+        for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+            String columnName = StringUtil.replace(columnDataType.getColumnName(), "`", "");
+            for (FieldMapping fieldMapping : originalFieldMappings) {
+                if (StringUtil.equals(fieldMapping.getSource().getName(), columnName)) {
+                    //TODO life 找到目标的表名,先是alter进行属性替换,然后config记录新的
+                    columnDataType.setColumnName(fieldMapping.getTarget().getName());
+                    //因为只是修改属性,所以表名称没有变化
+                    ddlConfig.setSourceColumnName(fieldMapping.getSource().getName());
+                    ddlConfig.setChangedColumnName(fieldMapping.getSource().getName());
+                }
+            }
+        }
+        ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_MODIFY);
+    }
+}

+ 137 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/impl/DDLParserImpl.java

@@ -0,0 +1,137 @@
+package org.dbsyncer.parser.ddl.impl;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import net.sf.jsqlparser.statement.alter.AlterOperation;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.Connector;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.parser.ddl.AlterStrategy;
+import org.dbsyncer.parser.ddl.DDLParser;
+import org.dbsyncer.parser.ddl.alter.AddStrategy;
+import org.dbsyncer.parser.ddl.alter.ChangeStrategy;
+import org.dbsyncer.parser.ddl.alter.DropStrategy;
+import org.dbsyncer.parser.ddl.alter.ModifyStrategy;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * alter情况
+ * <ol>
+ *     <li>只是修改字段的属性值</li>
+ *     <li>修改字段的名称</li>
+ * </ol>
+ *
+ * @author life
+ * @version 1.0.0
+ * @date 2023/9/19 22:38
+ */
+@Component
+public class DDLParserImpl implements DDLParser {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Resource
+    private ConnectorFactory connectorFactory;
+
+    private final Map<AlterOperation, AlterStrategy> STRATEGIES = new ConcurrentHashMap();
+
+    @PostConstruct
+    private void init() {
+        STRATEGIES.putIfAbsent(AlterOperation.MODIFY, new ModifyStrategy());
+        STRATEGIES.putIfAbsent(AlterOperation.ADD, new AddStrategy());
+        STRATEGIES.putIfAbsent(AlterOperation.CHANGE, new ChangeStrategy());
+        STRATEGIES.putIfAbsent(AlterOperation.DROP, new DropStrategy());
+    }
+
+    @Override
+    public DDLConfig parseDDlConfig(String sql, String targetConnectorType, String targetTableName, List<FieldMapping> originalFieldMappings) {
+        Connector connector = connectorFactory.getConnector(targetConnectorType);
+        // 替换为目标库执行SQL
+        DDLConfig ddlConfig = new DDLConfig();
+        try {
+            Statement statement = CCJSqlParserUtil.parse(sql);
+            if (statement instanceof Alter) {
+                Alter alter = (Alter) statement;
+                Database database = (Database) connector;
+                String quotation = database.buildSqlWithQuotation();
+                // 替换成目标表名
+                alter.getTable().setName(new StringBuilder(quotation).append(targetTableName).append(quotation).toString());
+                ddlConfig.setSql(alter.toString());
+                for (AlterExpression expression : alter.getAlterExpressions()) {
+                    if (STRATEGIES.containsKey(expression.getOperation())) {
+                        STRATEGIES.get(expression.getOperation()).parse(expression, ddlConfig, originalFieldMappings);
+                    }
+                }
+            }
+        } catch (JSQLParserException e) {
+            logger.error(e.getMessage(), e);
+        }
+        return ddlConfig;
+    }
+
+    @Override
+    public List<FieldMapping> refreshFiledMappings(List<FieldMapping> originalFieldMappings, MetaInfo originMetaInfo, MetaInfo targetMetaInfo, DDLConfig targetDDLConfig) {
+        List<FieldMapping> newTargetMappingList = new LinkedList<>();
+        //处理映射关系
+        for (FieldMapping fieldMapping : originalFieldMappings) {
+            String fieldSourceName = fieldMapping.getSource().getName();
+            String filedTargetName = fieldMapping.getTarget().getName();
+            //找到更改的源表的名称,也就是找到了对应的映射关系,这样就可以从源表找到更改后的名称进行对应,
+            if (fieldSourceName.equals(targetDDLConfig.getSourceColumnName())) {
+                // 说明字段名没有改变,只是改变了属性
+                if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_MODIFY) {
+                    Field source = originMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), fieldSourceName)).findFirst().get();
+                    Field target = targetMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), filedTargetName)).findFirst().get();
+                    //替换
+                    newTargetMappingList.add(new FieldMapping(source, target));
+                    continue;
+                } else if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_CHANGE) {
+                    Field source = originMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), targetDDLConfig.getChangedColumnName())).findFirst().get();
+                    Field target = targetMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), targetDDLConfig.getChangedColumnName())).findFirst().get();
+                    //替换
+                    newTargetMappingList.add(new FieldMapping(source, target));
+                    continue;
+                }
+            }
+            newTargetMappingList.add(fieldMapping);
+        }
+
+        if (DDLOperationEnum.ALTER_ADD == targetDDLConfig.getDdlOperationEnum()) {
+            //处理新增的映射关系
+            List<Field> addFields = targetDDLConfig.getAddFields();
+            for (Field field : addFields) {
+                Field source = originMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), field.getName())).findFirst().get();
+                Field target = targetMetaInfo.getColumn().stream().filter(x -> StringUtil.equals(x.getName(), field.getName())).findFirst().get();
+                newTargetMappingList.add(new FieldMapping(source, target));
+            }
+        }
+
+        if (DDLOperationEnum.ALTER_DROP == targetDDLConfig.getDdlOperationEnum()) {
+            //处理删除字段的映射关系
+            List<Field> removeFields = targetDDLConfig.getRemoveFields();
+            for (Field field : removeFields) {
+                newTargetMappingList.removeIf(x -> StringUtil.equals(x.getSource().getName(), field.getName()));
+            }
+        }
+        return newTargetMappingList;
+    }
+
+}

+ 123 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -7,27 +7,41 @@ import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.parser.ParserFactory;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.ddl.DDLParser;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.model.WriterResponse;
 import org.dbsyncer.parser.strategy.FlushStrategy;
+import org.dbsyncer.parser.util.ConfigModelUtil;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -42,6 +56,8 @@ import java.util.concurrent.Executor;
 @Component
 public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Resource
     private GeneralBufferConfig generalBufferConfig;
 
@@ -52,7 +68,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     private ConnectorFactory connectorFactory;
 
     @Resource
-    private ParserFactory parserFactory;
+    private Parser parser;
 
     @Resource
     private PluginFactory pluginFactory;
@@ -63,9 +79,15 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Resource
     private CacheService cacheService;
 
+    @Resource
+    private StorageService storageService;
+
     @Resource
     private ApplicationContext applicationContext;
 
+    @Resource
+    private DDLParser ddlParser;
+
     @PostConstruct
     public void init() {
         setConfig(generalBufferConfig);
@@ -79,11 +101,14 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
     @Override
     protected void partition(WriterRequest request, WriterResponse response) {
-        response.getDataList().add(request.getRow());
+        if (!CollectionUtils.isEmpty(request.getRow())) {
+            response.getDataList().add(request.getRow());
+        }
         response.getOffsetList().add(request.getChangedOffset());
         if (!response.isMerged()) {
             response.setTableGroupId(request.getTableGroupId());
             response.setEvent(request.getEvent());
+            response.setSql(request.getSql());
             response.setMerged(true);
         }
     }
@@ -91,21 +116,28 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Override
     protected boolean skipPartition(WriterRequest nextRequest, WriterResponse response) {
         // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,批处理任务中出现不同事件时,跳过分区处理
-        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent());
+        // 跳过表结构修改事件(保证表结构修改原子性)
+        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent()) || isDDLEvent(response.getEvent());
     }
 
     @Override
     protected void pull(WriterResponse response) {
-        // 1、获取配置信息
+        // 0、获取配置信息
         final TableGroup tableGroup = cacheService.get(response.getTableGroupId(), TableGroup.class);
         final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
         final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
+
+        // 1、ddl解析
+        if (isDDLEvent(response.getEvent())) {
+            parseDDl(response, mapping, group);
+            return;
+        }
+
         final String sourceTableName = group.getSourceTable().getName();
         final String targetTableName = group.getTargetTable().getName();
         final String event = response.getEvent();
         final Picker picker = new Picker(group.getFieldMapping());
         final List<Map> sourceDataList = response.getDataList();
-
         // 2、映射字段
         List<Map> targetDataList = picker.pickData(sourceDataList);
 
@@ -120,7 +152,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
         // 5、批量执行同步
         BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount());
-        Result result = parserFactory.writeBatch(context, batchWriter, generalExecutor);
+        Result result = parser.writeBatch(context, batchWriter, generalExecutor);
 
         // 6.发布刷新增量点事件
         applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
@@ -139,6 +171,67 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return generalExecutor;
     }
 
+    private boolean isDDLEvent(String event) {
+        return StringUtil.equals(event, ConnectorConstant.OPERTION_ALTER);
+    }
+
+    /**
+     * 解析DDL
+     *
+     * @param response
+     * @param mapping
+     * @param tableGroup
+     */
+    private void parseDDl(WriterResponse response, Mapping mapping, TableGroup tableGroup) {
+        try {
+            AbstractConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
+            AbstractConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
+            String sConnType = sConnConfig.getConnectorType();
+            String tConnType = tConnConfig.getConnectorType();
+            // 0.生成目标表执行SQL(暂支持MySQL) fixme AE86 暂内测MySQL作为试运行版本
+            if (StringUtil.equals(sConnType, tConnType) && StringUtil.equals(ConnectorEnum.MYSQL.getType(), tConnType)) {
+                String targetTableName = tableGroup.getTargetTable().getName();
+                List<FieldMapping> originalFieldMappings = tableGroup.getFieldMapping();
+                DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName, originalFieldMappings);
+                final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConnConfig);
+                Result result = connectorFactory.writerDDL(tConnectorMapper, targetDDLConfig);
+                result.setTableGroupId(tableGroup.getId());
+                result.setTargetTableGroupName(targetTableName);
+
+                // TODO life
+                // 1.获取目标表最新的属性字段
+                MetaInfo targetMetaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
+                MetaInfo originMetaInfo = parser.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
+                // 1.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.refreshTableFields
+                //上面已经是刷新了
+
+                // 1.2 要注意,表支持自定义主键,要兼容处理
+                //主键问题还未涉及到这种情况,可能还没写,可能要是删除主键会需要考虑,其他的情况我应该不会动
+
+                // 2.更新TableGroup.targetTable
+                tableGroup.getSourceTable().setColumn(originMetaInfo.getColumn());
+                tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
+
+                // 3.更新表字段映射(根据保留的更改的属性,进行更改)
+                tableGroup.setFieldMapping(ddlParser.refreshFiledMappings(originalFieldMappings, originMetaInfo, targetMetaInfo, targetDDLConfig));
+                // 4.合并驱动配置 & 更新TableGroup.command 合并驱动应该不需要了,我只是把该替换的地方替换掉了,原来的还是保持一致,应该需要更新TableGroup.command
+                // 4.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.mergeConfig
+                Map<String, String> commands = parser.getCommand(mapping, tableGroup);
+                tableGroup.setCommand(commands);
+                // 5.持久化存储 & 更新缓存
+                // 5.1 参考 org.dbsyncer.manager.ManagerFactory.editConfigModel
+                // 将方法移动到parser模块,就可以复用实现
+                flushCache(tableGroup);
+                applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
+                flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
+                return;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        logger.warn("暂只支持MYSQL解析DDL");
+    }
+
     /**
      * 获取连接器配置
      *
@@ -152,7 +245,30 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return conn.getConfig();
     }
 
+    /**
+     * 持久化驱动配置
+     *
+     * @param tableGroup
+     */
+    private void flushCache(TableGroup tableGroup) {
+        // 1、解析配置
+        ConfigModel model = tableGroup;
+        model.setCreateTime(new Date().getTime());
+        model.setUpdateTime(new Date().getTime());
+        Assert.notNull(model, "ConfigModel can not be null.");
+
+        // 2、持久化
+        Map<String, Object> params = ConfigModelUtil.convertModelToMap(model);
+        logger.debug("params:{}", params);
+        storageService.edit(StorageEnum.CONFIG, params);
+
+        // 3、缓存
+        Assert.notNull(model, "ConfigModel can not be null.");
+        cacheService.put(model.getId(), model);
+    }
+
     public void setGeneralExecutor(Executor generalExecutor) {
         this.generalExecutor = generalExecutor;
     }
+
 }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser.model;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.parser.flush.BufferRequest;
 
 import java.util.Map;
@@ -17,11 +18,16 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
 
     private ChangedOffset changedOffset;
 
+    private String sql;
+
     public WriterRequest(String tableGroupId, ChangedEvent event) {
         setTableGroupId(tableGroupId);
         setEvent(event.getEvent());
         this.row = event.getChangedRow();
         this.changedOffset = event.getChangedOffset();
+        if(event instanceof DDLChangedEvent){
+            sql = ((DDLChangedEvent) event).getSql();
+        }
     }
 
     @Override
@@ -36,4 +42,8 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
     public ChangedOffset getChangedOffset() {
         return changedOffset;
     }
+
+    public String getSql() {
+        return sql;
+    }
 }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -19,6 +19,8 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
 
     private List<ChangedOffset> offsetList = new LinkedList<>();
 
+    private String sql;
+
     private boolean isMerged;
 
     @Override
@@ -39,6 +41,14 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return offsetList;
     }
 
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
     public boolean isMerged() {
         return isMerged;
     }

+ 6 - 7
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -24,7 +24,6 @@ import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.dao.EmptyResultDataAccessException;
@@ -32,6 +31,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -60,13 +60,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final String PREFIX_TABLE = "dbsyncer_";
-    private static final String SHOW_TABLE = "show tables where Tables_in_%s = '%s'";
-    private static final String SHOW_DATA_TABLE = "show tables where Tables_in_%s like \"%s\"";
-    private static final String DROP_TABLE = "DROP TABLE %s";
-    private static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
+    private final String PREFIX_TABLE = "dbsyncer_";
+    private final String SHOW_TABLE = "show tables where Tables_in_%s = '%s'";
+    private final String DROP_TABLE = "DROP TABLE %s";
+    private final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
 
-    @Autowired
+    @Resource
     private ConnectorFactory connectorFactory;
 
     private Map<String, Executor> tables = new ConcurrentHashMap<>();

+ 7 - 0
pom.xml

@@ -54,6 +54,7 @@
         <protobuf.version>3.21.1</protobuf.version>
         <log4j2.version>2.17.1</log4j2.version>
         <junit.version>4.12</junit.version>
+        <jsql-parse.version>4.7</jsql-parse.version>
     </properties>
 
     <!-- 镜像仓库地址 -->
@@ -228,6 +229,12 @@
                 <version>${junit.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.github.jsqlparser</groupId>
+                <artifactId>jsqlparser</artifactId>
+                <version>${jsql-parse.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>