Переглянути джерело

支持DQL连接器增量监听

AE86 3 роки тому
батько
коміт
6908912c37

+ 8 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -68,10 +68,18 @@ public class RowChangedEvent {
         return beforeData;
     }
 
+    public void setBeforeData(List<Object> beforeData) {
+        this.beforeData = beforeData;
+    }
+
     public List<Object> getAfterData() {
         return afterData;
     }
 
+    public void setAfterData(List<Object> afterData) {
+        this.afterData = afterData;
+    }
+
     public Map<String, Object> getBefore() {
         return before;
     }

+ 103 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -0,0 +1,103 @@
+package org.dbsyncer.listener;
+
+import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.springframework.util.Assert;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/29 21:46
+ */
+public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
+
+    private DqlMapper dqlMapper;
+
+    /**
+     * 发送增量事件
+     *
+     * @param event
+     */
+    protected abstract void sendChangedEvent(RowChangedEvent event);
+
+    /**
+     * 发送DQL增量事件
+     *
+     * @param event
+     */
+    protected void sendDqlChangedEvent(RowChangedEvent event) {
+        if (null != event && event.getSourceTableName().equals(dqlMapper.tableName)) {
+            if (StringUtil.equals(ConnectorConstant.OPERTION_DELETE, event.getEvent())) {
+                event.setBeforeData(queryData(event.getBeforeData()));
+            } else {
+                event.setAfterData(queryData(event.getAfterData()));
+            }
+            changedEvent(event);
+        }
+    }
+
+    /**
+     * 初始化Dql连接配置
+     */
+    protected void postProcessDqlBeforeInitialization() {
+        DatabaseConnectorMapper mapper = (DatabaseConnectorMapper) connectorFactory.connect(connectorConfig);
+        DatabaseConfig cfg = mapper.getConfig();
+        final String tableName = cfg.getTable();
+        final String primaryKey = cfg.getPrimaryKey();
+        Assert.hasText(tableName, String.format("The table name '%s' is null.", tableName));
+        MetaInfo metaInfo = connectorFactory.getMetaInfo(mapper, tableName);
+        final List<Field> column = metaInfo.getColumn();
+        Assert.notEmpty(column, String.format("The column of table name '%s' is empty.", tableName));
+
+        int pkIndex = 0;
+        boolean findPkIndex = false;
+        for (Field f : column) {
+            if (f.isPk() && f.getName().equals(primaryKey)) {
+                pkIndex = column.indexOf(f);
+                findPkIndex = true;
+                break;
+            }
+        }
+        Assert.isTrue(findPkIndex, "The primaryKey is invalid.");
+        String sql = new StringBuilder(cfg.getSql()).append(" AND ").append(cfg.getPrimaryKey()).append("=?").toString();
+
+        dqlMapper = new DqlMapper(mapper, tableName, column, pkIndex, sql);
+    }
+
+    private List<Object> queryData(List<Object> data) {
+        if (data.size() >= dqlMapper.pkIndex) {
+            Map<String, Object> row = dqlMapper.mapper.execute(databaseTemplate -> databaseTemplate.queryForMap(dqlMapper.sql, data.get(dqlMapper.pkIndex)));
+            if (!CollectionUtils.isEmpty(row)) {
+                data.clear();
+                dqlMapper.column.forEach(field -> data.add(row.get(field.getName())));
+            }
+        }
+        return data;
+    }
+
+    final class DqlMapper {
+        DatabaseConnectorMapper mapper;
+        String tableName;
+        List<Field> column;
+        int pkIndex;
+        String sql;
+
+        public DqlMapper(DatabaseConnectorMapper mapper, String tableName, List<Field> column, int pkIndex, String sql) {
+            this.mapper = mapper;
+            this.tableName = tableName;
+            this.column = column;
+            this.pkIndex = pkIndex;
+            this.sql = sql;
+        }
+    }
+
+}

+ 16 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/LogExtractorEnum.java

@@ -5,8 +5,11 @@ import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.file.FileExtractor;
 import org.dbsyncer.listener.kafka.KafkaExtractor;
+import org.dbsyncer.listener.mysql.DqlMysqlExtractor;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
+import org.dbsyncer.listener.oracle.DqlOracleExtractor;
 import org.dbsyncer.listener.oracle.OracleExtractor;
+import org.dbsyncer.listener.postgresql.DqlPostgreSQLExtractor;
 import org.dbsyncer.listener.postgresql.PostgreSQLExtractor;
 import org.dbsyncer.listener.sqlserver.DqlSqlServerExtractor;
 import org.dbsyncer.listener.sqlserver.SqlServerExtractor;
@@ -44,10 +47,22 @@ public enum LogExtractorEnum {
      * File
      */
     FILE(ConnectorEnum.FILE.getType(), FileExtractor.class),
+    /**
+     * DqlMysql
+     */
+    DQL_MYSQL(ConnectorEnum.DQL_MYSQL.getType(), DqlMysqlExtractor.class),
+    /**
+     * DqlOracle
+     */
+    DQL_ORACLE(ConnectorEnum.DQL_ORACLE.getType(), DqlOracleExtractor.class),
     /**
      * DqlSqlServer
      */
-    DQL_SQL_SERVER(ConnectorEnum.DQL_SQL_SERVER.getType(), DqlSqlServerExtractor.class);
+    DQL_SQL_SERVER(ConnectorEnum.DQL_SQL_SERVER.getType(), DqlSqlServerExtractor.class),
+    /**
+     * DqlPostgreSQL
+     */
+    DQL_POSTGRE_SQL(ConnectorEnum.DQL_POSTGRE_SQL.getType(), DqlPostgreSQLExtractor.class);
 
     private String type;
     private Class clazz;

+ 22 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/DqlMysqlExtractor.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.listener.mysql;
+
+import org.dbsyncer.common.event.RowChangedEvent;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/28 22:02
+ */
+public class DqlMysqlExtractor extends MysqlExtractor {
+
+    @Override
+    public void start() {
+        super.postProcessDqlBeforeInitialization();
+        super.start();
+    }
+
+    @Override
+    public void sendChangedEvent(RowChangedEvent event) {
+        super.sendDqlChangedEvent(event);
+    }
+}

+ 10 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -6,7 +6,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
-import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.AbstractDatabaseExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.config.Host;
 import org.slf4j.Logger;
@@ -28,7 +28,7 @@ import static java.util.regex.Pattern.compile;
  * @Author AE86
  * @Date 2020-05-12 21:14
  */
-public class MysqlExtractor extends AbstractExtractor {
+public class MysqlExtractor extends AbstractDatabaseExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -76,6 +76,11 @@ public class MysqlExtractor extends AbstractExtractor {
         }
     }
 
+    @Override
+    protected void sendChangedEvent(RowChangedEvent event) {
+        changedEvent(event);
+    }
+
     private void run() throws Exception {
         final DatabaseConfig config = (DatabaseConfig) connectorConfig;
         if (StringUtil.isBlank(config.getUrl())) {
@@ -239,7 +244,7 @@ public class MysqlExtractor extends AbstractExtractor {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m.getKey()).collect(Collectors.toList());
                         List<Object> after = Stream.of(m.getValue()).collect(Collectors.toList());
-                        changedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, before, after));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, before, after));
                     });
                 }
                 return;
@@ -250,7 +255,7 @@ public class MysqlExtractor extends AbstractExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m).collect(Collectors.toList());
-                        changedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
                     });
                 }
                 return;
@@ -261,7 +266,7 @@ public class MysqlExtractor extends AbstractExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m).collect(Collectors.toList());
-                        changedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
                     });
                 }
                 return;

+ 22 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.listener.oracle;
+
+import org.dbsyncer.common.event.RowChangedEvent;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/29 22:44
+ */
+public class DqlOracleExtractor extends OracleExtractor {
+
+    @Override
+    public void start() {
+        super.postProcessDqlBeforeInitialization();
+        super.start();
+    }
+
+    @Override
+    public void changedEvent(RowChangedEvent event) {
+        super.sendDqlChangedEvent(event);
+    }
+}

+ 9 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java

@@ -1,7 +1,8 @@
 package org.dbsyncer.listener.oracle;
 
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.AbstractDatabaseExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.oracle.dcn.DBChangeNotification;
 import org.slf4j.Logger;
@@ -12,7 +13,7 @@ import org.slf4j.LoggerFactory;
  * @Author AE86
  * @Date 2020-05-12 21:14
  */
-public class OracleExtractor extends AbstractExtractor {
+public class OracleExtractor extends AbstractDatabaseExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -27,7 +28,7 @@ public class OracleExtractor extends AbstractExtractor {
             String url = config.getUrl();
             client = new DBChangeNotification(username, password, url);
             client.setFilterTable(filterTable);
-            client.addRowEventListener((e) -> changedEvent(e));
+            client.addRowEventListener((e) -> sendChangedEvent(e));
             client.start();
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
@@ -42,4 +43,9 @@ public class OracleExtractor extends AbstractExtractor {
         }
     }
 
+    @Override
+    protected void sendChangedEvent(RowChangedEvent event) {
+        changedEvent(event);
+    }
+
 }

+ 22 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/DqlPostgreSQLExtractor.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.listener.postgresql;
+
+import org.dbsyncer.common.event.RowChangedEvent;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/29 22:44
+ */
+public class DqlPostgreSQLExtractor extends PostgreSQLExtractor {
+
+    @Override
+    public void start() {
+        super.postProcessDqlBeforeInitialization();
+        super.start();
+    }
+
+    @Override
+    public void changedEvent(RowChangedEvent event) {
+        super.sendDqlChangedEvent(event);
+    }
+}

+ 9 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -1,11 +1,12 @@
 package org.dbsyncer.listener.postgresql;
 
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.BooleanUtil;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.util.DatabaseUtil;
-import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.AbstractDatabaseExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.postgresql.PGConnection;
@@ -35,7 +36,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * @version 1.0.0
  * @date 2022/4/10 22:36
  */
-public class PostgreSQLExtractor extends AbstractExtractor {
+public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -129,6 +130,11 @@ public class PostgreSQLExtractor extends AbstractExtractor {
         }
     }
 
+    @Override
+    protected void sendChangedEvent(RowChangedEvent event) {
+        changedEvent(event);
+    }
+
     private void connect() throws SQLException {
         Properties props = new Properties();
         PGProperty.USER.set(props, config.getUsername());
@@ -278,7 +284,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
 
                     flushLsn(lsn);
                     // process decoder
-                    changedEvent(messageDecoder.processMessage(msg));
+                    sendChangedEvent(messageDecoder.processMessage(msg));
 
                     // feedback
                     stream.setAppliedLSN(lsn);

+ 7 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/DqlSqlServerExtractor.java

@@ -10,8 +10,13 @@ import org.dbsyncer.common.event.RowChangedEvent;
 public class DqlSqlServerExtractor extends SqlServerExtractor {
 
     @Override
-    protected void sendChangedEvent(RowChangedEvent event) {
-        super.sendChangedEvent(event);
+    public void start() {
+        super.postProcessDqlBeforeInitialization();
+        super.start();
+    }
 
+    @Override
+    public void changedEvent(RowChangedEvent event) {
+        super.sendDqlChangedEvent(event);
     }
 }

+ 3 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -7,7 +7,7 @@ import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
-import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.AbstractDatabaseExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.enums.TableOperationEnum;
 import org.slf4j.Logger;
@@ -27,7 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * @Author AE86
  * @Date 2021-06-18 01:20
  */
-public class SqlServerExtractor extends AbstractExtractor {
+public class SqlServerExtractor extends AbstractDatabaseExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -102,6 +102,7 @@ public class SqlServerExtractor extends AbstractExtractor {
         }
     }
 
+    @Override
     protected void sendChangedEvent(RowChangedEvent event) {
         changedEvent(event);
     }