|
@@ -27,6 +27,7 @@ import org.dbsyncer.sdk.config.DatabaseConfig;
|
|
|
import org.dbsyncer.sdk.constant.ConnectorConstant;
|
|
|
import org.dbsyncer.sdk.constant.DatabaseConstant;
|
|
|
import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
|
|
|
+import org.dbsyncer.sdk.listener.ChangedEvent;
|
|
|
import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
|
|
|
import org.dbsyncer.sdk.listener.event.RowChangedEvent;
|
|
|
import org.dbsyncer.sdk.model.ChangedOffset;
|
|
@@ -170,7 +171,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
snapshot.put(BINLOG_POSITION, String.valueOf(nextPosition));
|
|
|
}
|
|
|
|
|
|
- private void trySendEvent(RowChangedEvent event){
|
|
|
+ private void trySendEvent(ChangedEvent event){
|
|
|
try {
|
|
|
// 如果消费事件失败,重试
|
|
|
long now = Instant.now().toEpochMilli();
|
|
@@ -330,7 +331,7 @@ public class MySQLListener extends AbstractDatabaseListener {
|
|
|
String tableName = StringUtil.replace(alter.getTable().getName(), "`", "");
|
|
|
if (isFilterTable(data.getDatabase(), tableName)) {
|
|
|
logger.info("sql:{}", data.getSql());
|
|
|
- changeEvent(new DDLChangedEvent(data.getDatabase(), tableName, ConnectorConstant.OPERTION_ALTER, data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
|
|
|
+ trySendEvent(new DDLChangedEvent(data.getDatabase(), tableName, ConnectorConstant.OPERTION_ALTER, data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
|
|
|
}
|
|
|
} catch (JSQLParserException e) {
|
|
|
logger.error("不支持ddl sql,支持标准的sql格式,请查看文档https://gitee.com/ghi/dbsyncer/wikis/%E5%BF%AB%E9%80%9F%E4%BA%86%E8%A7%A3/%E8%A1%A8%E7%BB%93%E6%9E%84%E5%90%8C%E6%AD%A5");
|