ソースを参照

!159 merge
Merge pull request !159 from AE86/design_consumer

AE86 1 年間 前
コミット
b6bec7bce0

+ 5 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -8,11 +8,12 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.function.Function;
 
 @Component
 public class ListenerFactory implements Listener {
 
-    private Map<ListenerTypeEnum, ExtractorMapper> map = new LinkedHashMap<>();
+    private Map<ListenerTypeEnum, Function<String, Class>> map = new LinkedHashMap<>();
 
     @PostConstruct
     private void init() {
@@ -22,17 +23,13 @@ public class ListenerFactory implements Listener {
 
     @Override
     public <T> T getExtractor(ListenerTypeEnum listenerTypeEnum, String connectorType, Class<T> valueType) throws IllegalAccessException, InstantiationException {
-        ExtractorMapper mapper = map.get(listenerTypeEnum);
-        if (null == mapper) {
+        Function function = map.get(listenerTypeEnum);
+        if (null == function) {
             throw new ListenerException(String.format("Unsupported type \"%s\" for extractor \"%s\".", listenerTypeEnum, connectorType));
         }
 
-        Class<T> clazz = (Class<T>) mapper.getExtractor(connectorType);
+        Class<T> clazz = (Class<T>) function.apply(connectorType);
         return clazz.newInstance();
     }
 
-    interface ExtractorMapper {
-        Class getExtractor(String connectorType);
-    }
-
 }

+ 17 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/TimingExtractorEnum.java

@@ -34,7 +34,23 @@ public enum TimingExtractorEnum {
     /**
      * Elasticsearch
      */
-    ELASTIC_SEARCH(ConnectorEnum.ELASTIC_SEARCH.getType(), ESQuartzExtractor.class);
+    ELASTIC_SEARCH(ConnectorEnum.ELASTIC_SEARCH.getType(), ESQuartzExtractor.class),
+    /**
+     * DqlMysql
+     */
+    DQL_MYSQL(ConnectorEnum.DQL_MYSQL.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * DqlOracle
+     */
+    DQL_ORACLE(ConnectorEnum.DQL_ORACLE.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * DqlSqlServer
+     */
+    DQL_SQL_SERVER(ConnectorEnum.DQL_SQL_SERVER.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * DqlPostgreSQL
+     */
+    DQL_POSTGRE_SQL(ConnectorEnum.DQL_POSTGRE_SQL.getType(), DatabaseQuartzExtractor.class);
 
     private String type;
     private Class clazz;

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogClient.java

@@ -93,19 +93,19 @@ public interface BinaryLogClient {
     void setTableMapEventByTableId(Map<Long, TableMapEventData> tableMapEventByTableId);
 
     /**
-     * SimpleEventModel
+     * 是否支持ddl
      *
      * @return
      */
-    boolean isSimpleEventModel();
+    boolean isEnableDDL();
 
     /**
      * <p>true: ROTATE > FORMAT_DESCRIPTION > TABLE_MAP > WRITE_ROWS > UPDATE_ROWS > DELETE_ROWS > XID
      * <p>false: Support all events
      *
-     * @param simpleEventModel
+     * @param enableDDL
      */
-    void setSimpleEventModel(boolean simpleEventModel);
+    void setEnableDDL(boolean enableDDL);
 
     /**
      * binlog-parser-127.0.0.1_3306_1

+ 6 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java

@@ -74,7 +74,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     private EventDeserializer eventDeserializer;
     private Map<Long, TableMapEventData> tableMapEventByTableId;
     private boolean blocking = true;
-    private boolean simpleEventModel = false;
+    private boolean enableDDL = false;
     private long serverId = 65535;
     private volatile String binlogFilename;
     private volatile long binlogPosition = 4;
@@ -583,7 +583,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new DeleteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
         eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer());
 
-        if (simpleEventModel) {
+        if (enableDDL) {
             eventDataDeserializers.put(EventType.INTVAR, new IntVarEventDataDeserializer());
             eventDataDeserializers.put(EventType.QUERY, new QueryEventDataDeserializer());
             eventDataDeserializers.put(EventType.ROWS_QUERY, new RowsQueryEventDataDeserializer());
@@ -710,14 +710,12 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         this.tableMapEventByTableId = tableMapEventByTableId;
     }
 
-    @Override
-    public boolean isSimpleEventModel() {
-        return simpleEventModel;
+    public boolean isEnableDDL() {
+        return enableDDL;
     }
 
-    @Override
-    public void setSimpleEventModel(boolean simpleEventModel) {
-        this.simpleEventModel = simpleEventModel;
+    public void setEnableDDL(boolean enableDDL) {
+        this.enableDDL = enableDDL;
     }
 
     @Override

+ 7 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -5,6 +5,7 @@ import com.github.shyiko.mysql.binlog.event.Event;
 import com.github.shyiko.mysql.binlog.event.EventHeader;
 import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
 import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
 import com.github.shyiko.mysql.binlog.event.RotateEventData;
 import com.github.shyiko.mysql.binlog.event.TableMapEventData;
 import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
@@ -289,6 +290,12 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 return;
             }
 
+            if (client.isEnableDDL() && EventType.QUERY == header.getEventType()) {
+                refresh(header);
+                QueryEventData data = event.getData();
+                logger.info("database:{}, sql:{}", data.getDatabase(), data.getSql());
+            }
+
             // 切换binlog
             if (header.getEventType() == EventType.ROTATE) {
                 RotateEventData data = event.getData();