|
@@ -1,4 +1,7 @@
|
|
|
-package org.dbsyncer.connector.mysql;
|
|
|
+/**
|
|
|
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
|
|
|
+ */
|
|
|
+package org.dbsyncer.connector.mysql.cdc;
|
|
|
|
|
|
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
|
|
|
import com.github.shyiko.mysql.binlog.event.Event;
|
|
@@ -15,10 +18,12 @@ import net.sf.jsqlparser.JSQLParserException;
|
|
|
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
|
|
|
import net.sf.jsqlparser.statement.alter.Alter;
|
|
|
import org.dbsyncer.common.util.StringUtil;
|
|
|
-import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
|
|
|
-import org.dbsyncer.connector.ConnectorException;
|
|
|
+import org.dbsyncer.connector.mysql.binlog.BinaryLogClient;
|
|
|
+import org.dbsyncer.connector.mysql.binlog.BinaryLogRemoteClient;
|
|
|
+import org.dbsyncer.connector.mysql.MySQLException;
|
|
|
import org.dbsyncer.sdk.config.DatabaseConfig;
|
|
|
import org.dbsyncer.sdk.constant.ConnectorConstant;
|
|
|
+import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
|
|
|
import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
|
|
|
import org.dbsyncer.sdk.listener.event.RowChangedEvent;
|
|
|
import org.dbsyncer.sdk.model.ChangedOffset;
|
|
@@ -41,9 +46,9 @@ import java.util.stream.Stream;
|
|
|
import static java.util.regex.Pattern.compile;
|
|
|
|
|
|
/**
|
|
|
- * @version 1.0.0
|
|
|
* @Author AE86
|
|
|
- * @Date 2020-05-12 21:14
|
|
|
+ * @Version 1.0.0
|
|
|
+ * @Date 2022-05-28 22:02
|
|
|
*/
|
|
|
public class MySQLListener extends AbstractDatabaseListener {
|
|
|
|
|
@@ -73,7 +78,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
connected = true;
|
|
|
} catch (Exception e) {
|
|
|
logger.error("启动失败:{}", e.getMessage());
|
|
|
- throw new ConnectorException(e);
|
|
|
+ throw new MySQLException(e);
|
|
|
} finally {
|
|
|
connectLock.unlock();
|
|
|
}
|
|
@@ -102,7 +107,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
private void run() throws Exception {
|
|
|
final DatabaseConfig config = (DatabaseConfig) connectorConfig;
|
|
|
if (StringUtil.isBlank(config.getUrl())) {
|
|
|
- throw new ConnectorException("url is invalid");
|
|
|
+ throw new MySQLException("url is invalid");
|
|
|
}
|
|
|
database = DatabaseUtil.getDatabaseName(config.getUrl());
|
|
|
cluster = readNodes(config.getUrl());
|
|
@@ -166,7 +171,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
}
|
|
|
run();
|
|
|
|
|
|
- errorEvent(new ConnectorException(String.format("重启成功, %s", client.getWorkerThreadName())));
|
|
|
+ errorEvent(new MySQLException(String.format("重启成功, %s", client.getWorkerThreadName())));
|
|
|
logger.error("第{}次重启成功, ThreadName:{} ", i, client.getWorkerThreadName());
|
|
|
recovery = false;
|
|
|
break;
|
|
@@ -174,7 +179,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
logger.error("第{}次重启异常, ThreadName:{}, {}", i, client.getWorkerThreadName(), e.getMessage());
|
|
|
// 无法连接,关闭任务
|
|
|
if (i == RETRY_TIMES) {
|
|
|
- errorEvent(new ConnectorException(String.format("重启异常, %s, %s", client.getWorkerThreadName(), e.getMessage())));
|
|
|
+ errorEvent(new MySQLException(String.format("重启异常, %s, %s", client.getWorkerThreadName(), e.getMessage())));
|
|
|
}
|
|
|
}
|
|
|
try {
|
|
@@ -249,7 +254,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
String log = String.format("线程[%s]执行异常。由于MySQL配置了过期binlog文件自动删除机制,已无法找到原binlog文件%s。建议先保存驱动(加载最新的binlog文件),再启动驱动。",
|
|
|
client.getWorkerThreadName(),
|
|
|
client.getBinlogFilename());
|
|
|
- errorEvent(new ConnectorException(log));
|
|
|
+ errorEvent(new MySQLException(log));
|
|
|
return;
|
|
|
}
|
|
|
}
|