AE86 пре 1 година
родитељ
комит
37269828f2

+ 37 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/DDLChangedEvent.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.common.event;
+
+public class DDLChangedEvent {
+
+    /**
+     * 变更数据库
+      */
+    private String database;
+
+    /**
+     * 变更表名称
+     */
+    private String tableName;
+
+    /**
+     * 变更SQL
+     */
+    private String sql;
+
+    public DDLChangedEvent(String database, String tableName, String sql) {
+        this.database = database;
+        this.tableName = tableName;
+        this.sql = sql;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+}

+ 8 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -18,6 +18,14 @@ public interface Watcher {
      */
     void changeEvent(ChangedEvent event);
 
+    /**
+     * DDL变更事件
+     *
+     * @param event
+     */
+    default void changeEvent(DDLChangedEvent event) {
+    }
+
     /**
      * 持久化增量点事件
      *

+ 6 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -2,6 +2,7 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
@@ -66,6 +67,11 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
+    @Override
+    public void changeEvent(DDLChangedEvent event) {
+        watcher.changeEvent(event);
+    }
+
     @Override
     public void refreshEvent(ChangedOffset offset) {
         // nothing to do

+ 8 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -2,6 +2,7 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 
 public interface Extractor {
@@ -30,6 +31,13 @@ public interface Extractor {
      */
     void changeEvent(ChangedEvent event);
 
+    /**
+     * DDL变更事件
+     *
+     * @param event
+     */
+    void changeEvent(DDLChangedEvent event);
+
     /**
      * 更新增量点
      *

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MySQLExtractor.java

@@ -12,6 +12,7 @@ import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
 import com.github.shyiko.mysql.binlog.network.ServerException;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -293,6 +294,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
             if (client.isEnableDDL() && EventType.QUERY == header.getEventType()) {
                 refresh(header);
                 QueryEventData data = event.getData();
+                changeEvent(new DDLChangedEvent(data.getDatabase(), "", data.getSql()));
                 logger.info("database:{}, sql:{}", data.getDatabase(), data.getSql());
             }
 

+ 9 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -2,6 +2,7 @@ package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
+import org.dbsyncer.common.event.DDLChangedEvent;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.event.ScanChangedEvent;
@@ -11,6 +12,7 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
@@ -251,10 +253,12 @@ public final class IncrementPuller extends AbstractPuller implements Application
     }
 
     final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
+        private Mapping mapping;
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
         public LogConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
             this.meta = meta;
+            this.mapping = mapping;
             tableGroups.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
@@ -281,6 +285,11 @@ public final class IncrementPuller extends AbstractPuller implements Application
                 });
             }
         }
+
+        @Override
+        public void changeEvent(DDLChangedEvent event) {
+            // TODO 解析ddl
+        }
     }
 
 }