Browse Source

!280 fix #IAT2Z2 增量同步时部分 ddl 语句无法同步
Merge pull request !280 from 双双/fixIAT2Z2

AE86 7 months ago
parent
commit
bc820acd6f

+ 38 - 12
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java

@@ -17,6 +17,7 @@ import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
 import com.github.shyiko.mysql.binlog.network.ServerException;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.alter.Alter;
 import org.dbsyncer.common.QueueOverflowException;
 import org.dbsyncer.common.util.StringUtil;
@@ -316,21 +317,47 @@ public class MySQLListener extends AbstractDatabaseListener {
         }
 
         private void parseDDL(QueryEventData data) {
-            if (isNotUniqueCodeEvent(data.getSql()) && StringUtil.startsWith(data.getSql(), ConnectorConstant.OPERTION_ALTER)) {
-                try {
-                    // ALTER TABLE `test`.`my_user` MODIFY COLUMN `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL AFTER `id`
-                    Alter alter = (Alter) CCJSqlParserUtil.parse(data.getSql());
-                    String tableName = StringUtil.replace(alter.getTable().getName(), "`", "");
-                    if (isFilterTable(data.getDatabase(), tableName)) {
-                        logger.info("sql:{}", data.getSql());
-                        trySendEvent(new DDLChangedEvent(data.getDatabase(), tableName, ConnectorConstant.OPERTION_ALTER, data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
-                    }
-                } catch (JSQLParserException e) {
-                    logger.error("不支持ddl sql,支持标准的sql格式,请查看文档https://gitee.com/ghi/dbsyncer/wikis/%E5%BF%AB%E9%80%9F%E4%BA%86%E8%A7%A3/%E8%A1%A8%E7%BB%93%E6%9E%84%E5%90%8C%E6%AD%A5");
+            if (isNotUniqueCodeEvent(data.getSql())) {
+                Statement statement = parseStatement(data.getSql());
+                if (statement == null) {
+                    return;
+                }
+
+                if (statement instanceof Alter) {
+                    parseAlter(data, (Alter) statement);
                 }
             }
         }
 
+        private Statement parseStatement(String sql) {
+            try {
+                return CCJSqlParserUtil.parse(sql);
+            } catch (JSQLParserException e) {
+                logger.error("不支持ddl sql,支持标准的sql格式,请查看文档https://gitee.com/ghi/dbsyncer/wikis" +
+                        "/%E6%93%8D%E4%BD%9C%E6%89%8B%E5%86%8C/%E8%A1%A8%E7%BB%93%E6%9E%84%E5%90%8C%E6%AD%A5");
+                return null;
+            }
+        }
+
+        private void parseAlter(QueryEventData data, Alter alter) {
+            // ALTER TABLE `test`.`my_user` MODIFY COLUMN `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL AFTER `id`
+            String tableName = StringUtil.replace(alter.getTable().getName(), "`", "");
+            // databaseName 的取值不能为 QueryEventData#getDatabase, getDatabase 获取的是执行 SQL 语句时所在的数据库上下文,
+            // 不是 alter 语句修改的表所在的数据库
+            // 依次执行下面两条语句 use db1; alter table db2.my_user add column name varchar(128);
+            // data.getDatabase() 的值为 db1, 不是 db2, 必须从 alter 中获取 databaseName
+            String databaseName = alter.getTable().getSchemaName();
+            // alter 中没有 databaseName 说明是 alter table my_user xxx 这种格式, 直接在上下文中获取即可
+            if (StringUtil.isBlank(databaseName)) {
+                databaseName = data.getDatabase();
+            }
+            if (isFilterTable(databaseName, tableName)) {
+                logger.info("sql:{}", data.getSql());
+                trySendEvent(new DDLChangedEvent(databaseName, tableName, ConnectorConstant.OPERTION_ALTER,
+                        data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
+            }
+        }
+
         private String getTableName(long tableId) {
             return tables.get(tableId).getTable();
         }
@@ -349,5 +376,4 @@ public class MySQLListener extends AbstractDatabaseListener {
         }
 
     }
-
 }