AE86 před 1 rokem
rodič
revize
5da0b14bda

+ 7 - 7
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MySQLExtractor.java

@@ -66,7 +66,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
         try {
         try {
             connectLock.lock();
             connectLock.lock();
             if (connected) {
             if (connected) {
-                logger.error("MysqlExtractor is already started");
+                logger.error("MySQLExtractor is already started");
                 return;
                 return;
             }
             }
             run();
             run();
@@ -106,7 +106,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
         }
         }
         database = DatabaseUtil.getDatabaseName(config.getUrl());
         database = DatabaseUtil.getDatabaseName(config.getUrl());
         cluster = readNodes(config.getUrl());
         cluster = readNodes(config.getUrl());
-        Assert.notEmpty(cluster, "Mysql连接地址有误.");
+        Assert.notEmpty(cluster, "MySQL连接地址有误.");
 
 
         final Host host = cluster.get(MASTER);
         final Host host = cluster.get(MASTER);
         final String username = config.getUsername();
         final String username = config.getUsername();
@@ -296,6 +296,7 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
             if (client.isEnableDDL() && EventType.QUERY == header.getEventType()) {
             if (client.isEnableDDL() && EventType.QUERY == header.getEventType()) {
                 refresh(header);
                 refresh(header);
                 parseDDL(event.getData());
                 parseDDL(event.getData());
+                return;
             }
             }
 
 
             // 切换binlog
             // 切换binlog
@@ -311,11 +312,10 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
                 Lexer lexer = new Lexer(data.getSql());
                 Lexer lexer = new Lexer(data.getSql());
                 lexer.nextToken('.');
                 lexer.nextToken('.');
                 lexer.nextToken('`');
                 lexer.nextToken('`');
-                lexer.nextToken('`');
-                final DDLChangedEvent event = new DDLChangedEvent(data.getDatabase(), lexer.token(), data.getSql());
-                if (isFilterTable(event.getDatabase(), event.getSourceTableName())) {
-                    logger.info("database:{}, sourceTableName:{}, sql:{}", event.getDatabase(), event.getSourceTableName(), data.getSql());
-                    changeEvent(event);
+                String tableName = lexer.nextToken('`');
+                if (isFilterTable(data.getDatabase(), tableName)) {
+                    logger.info("sql:{}", data.getSql());
+                    changeEvent(new DDLChangedEvent(data.getDatabase(), tableName, data.getSql()));
                 }
                 }
             }
             }
         }
         }

+ 6 - 7
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -24,7 +24,6 @@ import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.dao.EmptyResultDataAccessException;
 import org.springframework.dao.EmptyResultDataAccessException;
@@ -32,6 +31,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.io.BufferedReader;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
@@ -60,13 +60,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
-    private static final String PREFIX_TABLE = "dbsyncer_";
-    private static final String SHOW_TABLE = "show tables where Tables_in_%s = '%s'";
-    private static final String SHOW_DATA_TABLE = "show tables where Tables_in_%s like \"%s\"";
-    private static final String DROP_TABLE = "DROP TABLE %s";
-    private static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
+    private final String PREFIX_TABLE = "dbsyncer_";
+    private final String SHOW_TABLE = "show tables where Tables_in_%s = '%s'";
+    private final String DROP_TABLE = "DROP TABLE %s";
+    private final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
 
 
-    @Autowired
+    @Resource
     private ConnectorFactory connectorFactory;
     private ConnectorFactory connectorFactory;
 
 
     private Map<String, Executor> tables = new ConcurrentHashMap<>();
     private Map<String, Executor> tables = new ConcurrentHashMap<>();