|
@@ -170,10 +170,10 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
snapshot.put(BINLOG_POSITION, String.valueOf(nextPosition));
|
|
|
}
|
|
|
|
|
|
- private void trySendEvent(ChangedEvent event){
|
|
|
+ private void trySendEvent(ChangedEvent event) {
|
|
|
try {
|
|
|
// 如果消费事件失败,重试
|
|
|
- while (client.isConnected()){
|
|
|
+ while (client.isConnected()) {
|
|
|
try {
|
|
|
sendChangedEvent(event);
|
|
|
break;
|
|
@@ -319,10 +319,6 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
private void parseDDL(QueryEventData data) {
|
|
|
if (isNotUniqueCodeEvent(data.getSql())) {
|
|
|
Statement statement = parseStatement(data.getSql());
|
|
|
- if (statement == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
if (statement instanceof Alter) {
|
|
|
parseAlter(data, (Alter) statement);
|
|
|
}
|
|
@@ -331,12 +327,14 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
|
|
|
private Statement parseStatement(String sql) {
|
|
|
try {
|
|
|
- return CCJSqlParserUtil.parse(sql);
|
|
|
+ // skip BEGIN
|
|
|
+ if (!StringUtil.equalsIgnoreCase("BEGIN", sql)) {
|
|
|
+ return CCJSqlParserUtil.parse(sql);
|
|
|
+ }
|
|
|
} catch (JSQLParserException e) {
|
|
|
- logger.error("不支持ddl sql,支持标准的sql格式,请查看文档https://gitee.com/ghi/dbsyncer/wikis" +
|
|
|
- "/%E6%93%8D%E4%BD%9C%E6%89%8B%E5%86%8C/%E8%A1%A8%E7%BB%93%E6%9E%84%E5%90%8C%E6%AD%A5");
|
|
|
- return null;
|
|
|
+ logger.warn("不支持的ddl:{},标准的ddl请查看文档https://gitee.com/ghi/dbsyncer/wikis/%E6%93%8D%E4%BD%9C%E6%89%8B%E5%86%8C/%E8%A1%A8%E7%BB%93%E6%9E%84%E5%90%8C%E6%AD%A5", sql);
|
|
|
}
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
private void parseAlter(QueryEventData data, Alter alter) {
|
|
@@ -371,7 +369,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
return StringUtil.equalsIgnoreCase(database, dbName) && filterTable.contains(tableName);
|
|
|
}
|
|
|
|
|
|
- private boolean isNotUniqueCodeEvent(String sql){
|
|
|
+ private boolean isNotUniqueCodeEvent(String sql) {
|
|
|
return !StringUtil.startsWith(sql, DatabaseConstant.DBS_UNIQUE_CODE);
|
|
|
}
|
|
|
|