浏览代码

Merge remote-tracking branch 'origin/v_2.0' into v_2.0

bble 1 年之前
父节点
当前提交
97032f4971

+ 5 - 0
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/DQLMySQLConnector.java

@@ -67,4 +67,9 @@ public final class DQLMySQLConnector extends AbstractDQLConnector {
         }
         return null;
     }
+
+    @Override
+    public String generateUniqueCode() {
+        return DatabaseConstant.DBS_UNIQUE_CODE;
+    }
 }

+ 5 - 0
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/MySQLConnector.java

@@ -56,6 +56,11 @@ public final class MySQLConnector extends AbstractDatabaseConnector {
         return null;
     }
 
+    @Override
+    public String generateUniqueCode() {
+        return DatabaseConstant.DBS_UNIQUE_CODE;
+    }
+
     @Override
     public String buildSqlWithQuotation() {
         return "`";

+ 0 - 15
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/binlog/BinaryLogClient.java

@@ -95,21 +95,6 @@ public interface BinaryLogClient {
      */
     void setTableMapEventByTableId(Map<Long, TableMapEventData> tableMapEventByTableId);
 
-    /**
-     * 是否支持ddl
-     *
-     * @return
-     */
-    boolean isEnableDDL();
-
-    /**
-     * <p>true: ROTATE > FORMAT_DESCRIPTION > TABLE_MAP > WRITE_ROWS > UPDATE_ROWS > DELETE_ROWS > XID
-     * <p>false: Support all events
-     *
-     * @param enableDDL
-     */
-    void setEnableDDL(boolean enableDDL);
-
     /**
      * binlog-parser-127.0.0.1_3306_1
      *

+ 7 - 20
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/binlog/BinaryLogRemoteClient.java

@@ -77,8 +77,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     private EventDeserializer eventDeserializer;
     private Map<Long, TableMapEventData> tableMapEventByTableId;
     private boolean blocking = true;
-    private boolean enableDDL = false;
-    private long serverId = 65535;
+    private long serverId = 65535L;
     private volatile String binlogFilename;
     private volatile long binlogPosition = 4;
     private volatile long connectionId;
@@ -585,16 +584,12 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS, (new UpdateDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
         eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new DeleteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
         eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer());
-
-        if (enableDDL) {
-            eventDataDeserializers.put(EventType.INTVAR, new IntVarEventDataDeserializer());
-            eventDataDeserializers.put(EventType.QUERY, new QueryEventDataDeserializer());
-            eventDataDeserializers.put(EventType.ROWS_QUERY, new RowsQueryEventDataDeserializer());
-            eventDataDeserializers.put(EventType.GTID, new GtidEventDataDeserializer());
-            eventDataDeserializers.put(EventType.PREVIOUS_GTIDS, new PreviousGtidSetDeserializer());
-            eventDataDeserializers.put(EventType.XA_PREPARE, new XAPrepareEventDataDeserializer());
-        }
-
+        eventDataDeserializers.put(EventType.INTVAR, new IntVarEventDataDeserializer());
+        eventDataDeserializers.put(EventType.QUERY, new QueryEventDataDeserializer());
+        eventDataDeserializers.put(EventType.ROWS_QUERY, new RowsQueryEventDataDeserializer());
+        eventDataDeserializers.put(EventType.GTID, new GtidEventDataDeserializer());
+        eventDataDeserializers.put(EventType.PREVIOUS_GTIDS, new PreviousGtidSetDeserializer());
+        eventDataDeserializers.put(EventType.XA_PREPARE, new XAPrepareEventDataDeserializer());
     }
 
     private void notifyEventListeners(Event event) {
@@ -713,14 +708,6 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         this.tableMapEventByTableId = tableMapEventByTableId;
     }
 
-    public boolean isEnableDDL() {
-        return enableDDL;
-    }
-
-    public void setEnableDDL(boolean enableDDL) {
-        this.enableDDL = enableDDL;
-    }
-
     @Override
     public String getWorkerThreadName() {
         return workerThreadName;

+ 27 - 11
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java

@@ -10,6 +10,7 @@ import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
 import com.github.shyiko.mysql.binlog.event.EventType;
 import com.github.shyiko.mysql.binlog.event.QueryEventData;
 import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
 import com.github.shyiko.mysql.binlog.event.TableMapEventData;
 import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
@@ -18,11 +19,12 @@ import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.statement.alter.Alter;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.mysql.MySQLException;
 import org.dbsyncer.connector.mysql.binlog.BinaryLogClient;
 import org.dbsyncer.connector.mysql.binlog.BinaryLogRemoteClient;
-import org.dbsyncer.connector.mysql.MySQLException;
 import org.dbsyncer.sdk.config.DatabaseConfig;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
+import org.dbsyncer.sdk.constant.DatabaseConstant;
 import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
 import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
 import org.dbsyncer.sdk.listener.event.RowChangedEvent;
@@ -118,12 +120,11 @@ public class MySQLListener extends AbstractDatabaseListener {
         final String password = config.getPassword();
         boolean containsPos = snapshot.containsKey(BINLOG_POSITION);
         client = new BinaryLogRemoteClient(host.getIp(), host.getPort(), username, password);
-        client.setEnableDDL(true);
         client.setBinlogFilename(snapshot.get(BINLOG_FILENAME));
         client.setBinlogPosition(containsPos ? Long.parseLong(snapshot.get(BINLOG_POSITION)) : 0);
         client.setTableMapEventByTableId(tables);
-        client.registerEventListener(new MysqlEventListener());
-        client.registerLifecycleListener(new MysqlLifecycleListener());
+        client.registerEventListener(new InnerEventListener());
+        client.registerLifecycleListener(new InnerLifecycleListener());
 
         client.connect();
 
@@ -227,7 +228,7 @@ public class MySQLListener extends AbstractDatabaseListener {
         }
     }
 
-    final class MysqlLifecycleListener implements BinaryLogRemoteClient.LifecycleListener {
+    final class InnerLifecycleListener implements BinaryLogRemoteClient.LifecycleListener {
 
         @Override
         public void onConnect(BinaryLogRemoteClient client) {
@@ -272,7 +273,12 @@ public class MySQLListener extends AbstractDatabaseListener {
 
     }
 
-    final class MysqlEventListener implements BinaryLogRemoteClient.EventListener {
+    final class InnerEventListener implements BinaryLogRemoteClient.EventListener {
+
+        /**
+         * 只处理非dbs写入事件(单线程消费,不存在并发竞争)
+         */
+        private boolean notUniqueCodeEvent = true;
 
         @Override
         public void onEvent(Event event) {
@@ -283,7 +289,13 @@ public class MySQLListener extends AbstractDatabaseListener {
                 return;
             }
 
-            if (EventType.isUpdate(header.getEventType())) {
+            if (header.getEventType() == EventType.ROWS_QUERY) {
+                RowsQueryEventData data = event.getData();
+                notUniqueCodeEvent = isNotUniqueCodeEvent(data.getQuery());
+                return;
+            }
+
+            if (notUniqueCodeEvent && EventType.isUpdate(header.getEventType())) {
                 refresh(header);
                 UpdateRowsEventData data = event.getData();
                 if (isFilterTable(data.getTableId())) {
@@ -294,7 +306,7 @@ public class MySQLListener extends AbstractDatabaseListener {
                 }
                 return;
             }
-            if (EventType.isWrite(header.getEventType())) {
+            if (notUniqueCodeEvent && EventType.isWrite(header.getEventType())) {
                 refresh(header);
                 WriteRowsEventData data = event.getData();
                 if (isFilterTable(data.getTableId())) {
@@ -305,7 +317,7 @@ public class MySQLListener extends AbstractDatabaseListener {
                 }
                 return;
             }
-            if (EventType.isDelete(header.getEventType())) {
+            if (notUniqueCodeEvent && EventType.isDelete(header.getEventType())) {
                 refresh(header);
                 DeleteRowsEventData data = event.getData();
                 if (isFilterTable(data.getTableId())) {
@@ -317,7 +329,7 @@ public class MySQLListener extends AbstractDatabaseListener {
                 return;
             }
 
-            if (client.isEnableDDL() && EventType.QUERY == header.getEventType()) {
+            if (EventType.QUERY == header.getEventType()) {
                 refresh(header);
                 parseDDL(event.getData());
                 return;
@@ -331,7 +343,7 @@ public class MySQLListener extends AbstractDatabaseListener {
         }
 
         private void parseDDL(QueryEventData data) {
-            if (StringUtil.startsWith(data.getSql(), ConnectorConstant.OPERTION_ALTER)) {
+            if (isNotUniqueCodeEvent(data.getSql()) && StringUtil.startsWith(data.getSql(), ConnectorConstant.OPERTION_ALTER)) {
                 try {
                     // ALTER TABLE `test`.`my_user` MODIFY COLUMN `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL AFTER `id`
                     Alter alter = (Alter) CCJSqlParserUtil.parse(data.getSql());
@@ -359,6 +371,10 @@ public class MySQLListener extends AbstractDatabaseListener {
             return StringUtil.equalsIgnoreCase(database, dbName) && filterTable.contains(tableName);
         }
 
+        private boolean isNotUniqueCodeEvent(String sql){
+            return !StringUtil.startsWith(sql, DatabaseConstant.DBS_UNIQUE_CODE);
+        }
+
     }
 
 }

+ 3 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -17,6 +17,7 @@ import org.dbsyncer.sdk.connector.AbstractConnector;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.connector.database.ds.SimpleConnection;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
+import org.dbsyncer.sdk.constant.DatabaseConstant;
 import org.dbsyncer.sdk.enums.OperationEnum;
 import org.dbsyncer.sdk.enums.SqlBuilderEnum;
 import org.dbsyncer.sdk.enums.TableTypeEnum;
@@ -666,7 +667,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         try {
             Assert.hasText(config.getSql(), "执行SQL语句不能为空.");
             connectorInstance.execute(databaseTemplate -> {
-                databaseTemplate.execute(config.getSql());
+                // 执行ddl时, 带上dbs唯一标识码,防止双向同步导致死循环
+                databaseTemplate.execute(DatabaseConstant.DBS_UNIQUE_CODE.concat(config.getSql()));
                 return true;
             });
         } catch (Exception e) {

+ 9 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/Database.java

@@ -10,6 +10,15 @@ import java.util.List;
 
 public interface Database {
 
+    /**
+     * 获取dbs唯一标识码
+     *
+     * @return
+     */
+    default String generateUniqueCode() {
+        return StringUtil.EMPTY;
+    }
+
     /**
      * 查询语句表名和字段带上引号(默认不加)
      *

+ 2 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/sqlbuilder/SqlBuilderDelete.java

@@ -21,7 +21,8 @@ public class SqlBuilderDelete extends AbstractSqlBuilder {
         String tableName = config.getTableName();
         List<String> primaryKeys = database.buildPrimaryKeys(config.getPrimaryKeys());
         // DELETE FROM "USER" WHERE "ID"=? AND "UID" = ?
-        StringBuilder sql = new StringBuilder().append("DELETE FROM ").append(config.getSchema());
+        StringBuilder sql = new StringBuilder(database.generateUniqueCode());
+        sql.append("DELETE FROM ").append(config.getSchema());
         sql.append(quotation);
         sql.append(database.buildTableName(tableName));
         sql.append(quotation);

+ 2 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/sqlbuilder/SqlBuilderInsert.java

@@ -37,7 +37,8 @@ public class SqlBuilderInsert extends AbstractSqlBuilder {
             }
         }
         // INSERT INTO "USER"("USERNAME","AGE") VALUES (?,?)
-        StringBuilder sql = new StringBuilder("INSERT INTO ");
+        StringBuilder sql = new StringBuilder(database.generateUniqueCode());
+        sql.append("INSERT INTO ");
         sql.append(config.getSchema());
         sql.append(quotation);
         sql.append(database.buildTableName(config.getTableName()));

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/sqlbuilder/SqlBuilderUpdate.java

@@ -22,7 +22,7 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
         String quotation = database.buildSqlWithQuotation();
         List<Field> fields = config.getFields();
 
-        StringBuilder sql = new StringBuilder();
+        StringBuilder sql = new StringBuilder(database.generateUniqueCode());
         sql.append("UPDATE ").append(config.getSchema());
         sql.append(quotation);
         sql.append(database.buildTableName(config.getTableName()));

+ 5 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/constant/DatabaseConstant.java

@@ -2,6 +2,11 @@ package org.dbsyncer.sdk.constant;
 
 public class DatabaseConstant {
 
+    /**
+     * dbs唯一标识码
+     */
+    public static final String DBS_UNIQUE_CODE = "/*dbs*/";
+
     //*********************************** Mysql **************************************//
     /**
      * Mysql分页语句

+ 1 - 1
dbsyncer-storage/src/main/test/LuceneFactoryTest.java

@@ -34,12 +34,12 @@ import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
 import org.apache.lucene.util.BytesRef;
 import org.dbsyncer.common.model.Paging;
-import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
+import org.dbsyncer.storage.impl.SnowflakeIdWorker;
 import org.dbsyncer.storage.lucene.Option;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.util.BinlogMessageUtil;