瀏覽代碼

修复mysql增量同步会监听非当前库表数据

AE86 3 年之前
父節點
當前提交
622d7b81bf
共有 1 個文件被更改,包括 37 次插入22 次删除
  1. 37 22
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

+ 37 - 22
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -32,13 +32,14 @@ public class MysqlExtractor extends AbstractExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final String          BINLOG_FILENAME = "fileName";
-    private static final String          BINLOG_POSITION = "position";
-    private static final int             RETRY_TIMES     = 10;
-    private static final int             MASTER          = 0;
-    private Map<Long, TableMapEventData> tables          = new HashMap<>();
-    private BinaryLogClient              client;
-    private List<Host>                   cluster;
+    private static final String BINLOG_FILENAME = "fileName";
+    private static final String BINLOG_POSITION = "position";
+    private static final int RETRY_TIMES = 10;
+    private static final int MASTER = 0;
+    private Map<Long, TableMapEventData> tables = new HashMap<>();
+    private BinaryLogClient client;
+    private List<Host> cluster;
+    private String database;
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
 
@@ -77,6 +78,10 @@ public class MysqlExtractor extends AbstractExtractor {
 
     private void run() throws Exception {
         final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+        if (StringUtil.isBlank(config.getUrl())) {
+            throw new ListenerException("url is invalid");
+        }
+        database = readDatabaseName(config.getUrl());
         cluster = readNodes(config.getUrl());
         Assert.notEmpty(cluster, "Mysql连接地址有误.");
 
@@ -94,10 +99,22 @@ public class MysqlExtractor extends AbstractExtractor {
         client.connect();
     }
 
-    private List<Host> readNodes(String url) {
-        if (StringUtil.isBlank(url)) {
-            return Collections.EMPTY_LIST;
+    private String readDatabaseName(String url) {
+        Matcher matcher = compile("(//)(?!(\\?)).+?(\\?)").matcher(url);
+        while (matcher.find()) {
+            url = matcher.group(0);
+            break;
         }
+        int s = url.lastIndexOf("/");
+        int e = url.lastIndexOf("?");
+        if (s > 0 && e > 0) {
+            return StringUtil.substring(url, s + 1, e);
+        }
+
+        throw new ListenerException("database is invalid");
+    }
+
+    private List<Host> readNodes(String url) {
         Matcher matcher = compile("(//)(?!(/)).+?(/)").matcher(url);
         while (matcher.find()) {
             url = matcher.group(0);
@@ -217,12 +234,11 @@ public class MysqlExtractor extends AbstractExtractor {
 
             if (EventType.isUpdate(header.getEventType())) {
                 UpdateRowsEventData data = event.getData();
-                String tableName = getTableName(data.getTableId());
-                if (isFilterTable(tableName, ConnectorConstant.OPERTION_UPDATE)) {
+                if (isFilterTable(data.getTableId(), ConnectorConstant.OPERTION_UPDATE)) {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m.getKey()).collect(Collectors.toList());
                         List<Object> after = Stream.of(m.getValue()).collect(Collectors.toList());
-                        asynSendRowChangedEvent(new RowChangedEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after));
+                        asynSendRowChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, before, after));
                     });
                 }
                 refresh(header);
@@ -230,11 +246,10 @@ public class MysqlExtractor extends AbstractExtractor {
             }
             if (EventType.isWrite(header.getEventType())) {
                 WriteRowsEventData data = event.getData();
-                String tableName = getTableName(data.getTableId());
-                if (isFilterTable(tableName, ConnectorConstant.OPERTION_INSERT)) {
+                if (isFilterTable(data.getTableId(), ConnectorConstant.OPERTION_INSERT)) {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m).collect(Collectors.toList());
-                        asynSendRowChangedEvent(new RowChangedEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
+                        asynSendRowChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
                     });
                 }
                 refresh(header);
@@ -242,11 +257,10 @@ public class MysqlExtractor extends AbstractExtractor {
             }
             if (EventType.isDelete(header.getEventType())) {
                 DeleteRowsEventData data = event.getData();
-                String tableName = getTableName(data.getTableId());
-                if (isFilterTable(tableName, ConnectorConstant.OPERTION_DELETE)) {
+                if (isFilterTable(data.getTableId(), ConnectorConstant.OPERTION_DELETE)) {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m).collect(Collectors.toList());
-                        asynSendRowChangedEvent(new RowChangedEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
+                        asynSendRowChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
                     });
                 }
                 refresh(header);
@@ -266,9 +280,10 @@ public class MysqlExtractor extends AbstractExtractor {
             return tables.get(tableId).getTable();
         }
 
-        private boolean isFilterTable(String tableName, String event) {
-            if (!filterTable.contains(tableName)) {
-                logger.info("Table[{}] {}", tableName, event);
+        private boolean isFilterTable(long tableId, String event) {
+            final TableMapEventData tableMap = tables.get(tableId);
+            if (!StringUtil.equals(database, tableMap.getDatabase()) || !filterTable.contains(tableMap.getTable())) {
+                logger.info("Table[{}.{}] {}", tableMap.getDatabase(), tableMap.getTable(), event);
                 return false;
             }
             return true;