AE86 преди 1 година
родител
ревизия
1a5033db79

+ 3 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/CommonChangedEvent.java

@@ -31,6 +31,7 @@ public class CommonChangedEvent implements ChangedEvent {
      */
     private ChangedOffset changedOffset = new ChangedOffset();
 
+    @Override
     public String getSourceTableName() {
         return sourceTableName;
     }
@@ -39,6 +40,7 @@ public class CommonChangedEvent implements ChangedEvent {
         this.sourceTableName = sourceTableName;
     }
 
+    @Override
     public String getEvent() {
         return event;
     }
@@ -47,6 +49,7 @@ public class CommonChangedEvent implements ChangedEvent {
         this.event = event;
     }
 
+    @Override
     public Map<String, Object> getChangedRow() {
         return changedRow;
     }

+ 13 - 12
dbsyncer-common/src/main/java/org/dbsyncer/common/event/DDLChangedEvent.java

@@ -1,25 +1,30 @@
 package org.dbsyncer.common.event;
 
-public class DDLChangedEvent {
+/**
+ * DDL变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-09-18 23:00
+ */
+public class DDLChangedEvent extends CommonChangedEvent {
 
     /**
      * 变更数据库
       */
     private String database;
 
-    /**
-     * 变更表名称
-     */
-    private String sourceTableName;
-
     /**
      * 变更SQL
      */
     private String sql;
 
-    public DDLChangedEvent(String database, String sourceTableName, String sql) {
+    public DDLChangedEvent(String database, String sourceTableName, String event, String sql, String nextFileName, Object position) {
+        setSourceTableName(sourceTableName);
+        setEvent(event);
+        setNextFileName(nextFileName);
+        setPosition(position);
         this.database = database;
-        this.sourceTableName = sourceTableName;
         this.sql = sql;
     }
 
@@ -27,10 +32,6 @@ public class DDLChangedEvent {
         return database;
     }
 
-    public String getSourceTableName() {
-        return sourceTableName;
-    }
-
     public String getSql() {
         return sql;
     }

+ 0 - 8
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -18,14 +18,6 @@ public interface Watcher {
      */
     void changeEvent(ChangedEvent event);
 
-    /**
-     * DDL变更事件
-     *
-     * @param event
-     */
-    default void changeEvent(DDLChangedEvent event) {
-    }
-
     /**
      * 持久化增量点事件
      *

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java

@@ -22,6 +22,11 @@ public class ConnectorConstant {
      */
     public static final String OPERTION_DELETE = "DELETE";
 
+    /**
+     * 表结构更改
+     */
+    public static final String OPERTION_ALTER = "ALTER";
+
     /**
      * 查询
      */

+ 4 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -2,7 +2,6 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
-import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
@@ -61,17 +60,16 @@ public abstract class AbstractExtractor implements Extractor {
                     // 是否支持监听删除事件
                     processEvent(!listenerConfig.isBanDelete(), event);
                     break;
+                case ConnectorConstant.OPERTION_ALTER:
+                    // 表结构变更事件
+                    watcher.changeEvent(event);
+                    break;
                 default:
                     break;
             }
         }
     }
 
-    @Override
-    public void changeEvent(DDLChangedEvent event) {
-        watcher.changeEvent(event);
-    }
-
     @Override
     public void refreshEvent(ChangedOffset offset) {
         // nothing to do

+ 0 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -2,7 +2,6 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
-import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 
 public interface Extractor {
@@ -31,13 +30,6 @@ public interface Extractor {
      */
     void changeEvent(ChangedEvent event);
 
-    /**
-     * DDL变更事件
-     *
-     * @param event
-     */
-    void changeEvent(DDLChangedEvent event);
-
     /**
      * 更新增量点
      *

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MySQLExtractor.java

@@ -50,7 +50,6 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
 
     private final String BINLOG_FILENAME = "fileName";
     private final String BINLOG_POSITION = "position";
-    private final String DDL_PREFIX = "ALTER";
     private final int RETRY_TIMES = 10;
     private final int MASTER = 0;
     private Map<Long, TableMapEventData> tables = new HashMap<>();
@@ -113,6 +112,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
         final String password = config.getPassword();
         boolean containsPos = snapshot.containsKey(BINLOG_POSITION);
         client = new BinaryLogRemoteClient(host.getIp(), host.getPort(), username, password);
+        client.setEnableDDL(true);
         client.setBinlogFilename(snapshot.get(BINLOG_FILENAME));
         client.setBinlogPosition(containsPos ? Long.parseLong(snapshot.get(BINLOG_POSITION)) : 0);
         client.setTableMapEventByTableId(tables);
@@ -307,7 +307,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
         }
 
         private void parseDDL(QueryEventData data) {
-            if (StringUtil.startsWith(data.getSql(), DDL_PREFIX)) {
+            if (StringUtil.startsWith(data.getSql(), ConnectorConstant.OPERTION_ALTER)) {
                 // ALTER TABLE `test`.`my_user` MODIFY COLUMN `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL AFTER `id`
                 Lexer lexer = new Lexer(data.getSql());
                 lexer.nextToken('.');
@@ -315,7 +315,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
                 String tableName = lexer.nextToken('`');
                 if (isFilterTable(data.getDatabase(), tableName)) {
                     logger.info("sql:{}", data.getSql());
-                    changeEvent(new DDLChangedEvent(data.getDatabase(), tableName, data.getSql()));
+                    changeEvent(new DDLChangedEvent(data.getDatabase(), tableName, ConnectorConstant.OPERTION_ALTER, data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
                 }
             }
         }

+ 27 - 14
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -2,6 +2,7 @@ package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.CommonChangedEvent;
 import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
@@ -12,7 +13,6 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -199,9 +200,16 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
         public abstract void onChange(E e);
 
+        public void onDDLChanged(DDLChangedEvent event) {
+        }
+
         @Override
         public void changeEvent(ChangedEvent event) {
             event.getChangedOffset().setMetaId(meta.getId());
+            if (event instanceof DDLChangedEvent) {
+                onDDLChanged((DDLChangedEvent) event);
+                return;
+            }
             onChange((E) event);
         }
 
@@ -221,11 +229,11 @@ public final class IncrementPuller extends AbstractPuller implements Application
             return meta.getUpdateTime();
         }
 
-        protected void bind(String tableGroupId){
+        protected void bind(String tableGroupId) {
             bufferActuatorRouter.bind(meta.getId(), tableGroupId);
         }
 
-        protected void execute(String tableGroupId, E event){
+        protected void execute(String tableGroupId, ChangedEvent event) {
             bufferActuatorRouter.execute(meta.getId(), tableGroupId, event);
         }
     }
@@ -271,25 +279,30 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
         @Override
         public void onChange(RowChangedEvent event) {
+            process(event, picker -> {
+                final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
+                if (picker.filter(changedRow)) {
+                    event.setChangedRow(changedRow);
+                    execute(picker.getTableGroup().getId(), event);
+                }
+            });
+        }
+
+        @Override
+        public void onDDLChanged(DDLChangedEvent event) {
+            process(event, picker -> execute(picker.getTableGroup().getId(), event));
+        }
+
+        private void process(CommonChangedEvent event, Consumer<FieldPicker> consumer) {
             // 处理过程有异常向上抛
             List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
                 // 触发刷新增量点事件
                 event.getChangedOffset().setRefreshOffset(true);
-                pickers.forEach(picker -> {
-                    final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
-                    if (picker.filter(changedRow)) {
-                        event.setChangedRow(changedRow);
-                        execute(picker.getTableGroup().getId(), event);
-                    }
-                });
+                pickers.forEach(picker -> consumer.accept(picker));
             }
         }
 
-        @Override
-        public void changeEvent(DDLChangedEvent event) {
-            // TODO 解析ddl
-        }
     }
 
 }

+ 19 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -7,8 +7,10 @@ import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.BatchWriter;
@@ -79,11 +81,14 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
     @Override
     protected void partition(WriterRequest request, WriterResponse response) {
-        response.getDataList().add(request.getRow());
+        if (!CollectionUtils.isEmpty(request.getRow())) {
+            response.getDataList().add(request.getRow());
+        }
         response.getOffsetList().add(request.getChangedOffset());
         if (!response.isMerged()) {
             response.setTableGroupId(request.getTableGroupId());
             response.setEvent(request.getEvent());
+            response.setSql(request.getSql());
             response.setMerged(true);
         }
     }
@@ -91,7 +96,8 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Override
     protected boolean skipPartition(WriterRequest nextRequest, WriterResponse response) {
         // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,批处理任务中出现不同事件时,跳过分区处理
-        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent());
+        // 跳过表结构修改事件(保证表结构修改原子性)
+        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent()) || isDDLEvent(response.getEvent());
     }
 
     @Override
@@ -106,6 +112,13 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final Picker picker = new Picker(group.getFieldMapping());
         final List<Map> sourceDataList = response.getDataList();
 
+        // TODO ddl解析
+        if (isDDLEvent(response.getEvent())) {
+            applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
+            flushStrategy.flushIncrementData(mapping.getMetaId(), new Result(), event);
+            return;
+        }
+
         // 2、映射字段
         List<Map> targetDataList = picker.pickData(sourceDataList);
 
@@ -139,6 +152,10 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return generalExecutor;
     }
 
+    private boolean isDDLEvent(String event) {
+        return StringUtil.equals(event, ConnectorConstant.OPERTION_ALTER);
+    }
+
     /**
      * 获取连接器配置
      *

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser.model;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.parser.flush.BufferRequest;
 
 import java.util.Map;
@@ -17,11 +18,16 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
 
     private ChangedOffset changedOffset;
 
+    private String sql;
+
     public WriterRequest(String tableGroupId, ChangedEvent event) {
         setTableGroupId(tableGroupId);
         setEvent(event.getEvent());
         this.row = event.getChangedRow();
         this.changedOffset = event.getChangedOffset();
+        if(event instanceof DDLChangedEvent){
+            sql = ((DDLChangedEvent) event).getSql();
+        }
     }
 
     @Override
@@ -36,4 +42,8 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
     public ChangedOffset getChangedOffset() {
         return changedOffset;
     }
+
+    public String getSql() {
+        return sql;
+    }
 }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -19,6 +19,8 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
 
     private List<ChangedOffset> offsetList = new LinkedList<>();
 
+    private String sql;
+
     private boolean isMerged;
 
     @Override
@@ -39,6 +41,14 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return offsetList;
     }
 
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
     public boolean isMerged() {
         return isMerged;
     }