Explorar o código

!222 merge
Merge pull request !222 from AE86/v_2.0

AE86 hai 1 ano
pai
achega
9a6e3a4719
Modificáronse 25 ficheiros con 262 adicións e 198 borrados
  1. 6 0
      dbsyncer-connector/dbsyncer-connector-base/pom.xml
  2. 128 4
      dbsyncer-connector/dbsyncer-connector-base/src/test/java/ConnectionTest.java
  3. 2 1
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java
  4. 0 8
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/DQLMySQLConnector.java
  5. 2 6
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/storage/MySQLStorageService.java
  6. 43 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserSupportConfiguration.java
  7. 0 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/AbstractConsumer.java
  8. 0 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/LogConsumer.java
  9. 21 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  10. 1 57
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  11. 2 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java
  12. 7 0
      dbsyncer-sdk/pom.xml
  13. 3 23
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDQLConnector.java
  14. 0 7
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/DatabaseTemplate.java
  15. 1 9
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/sqlbuilder/SqlBuilderQueryCount.java
  16. 10 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/IntegerValueMapper.java
  17. 6 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/TimestampValueMapper.java
  18. 4 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/VarcharValueMapper.java
  19. 0 7
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/ChangedEventTypeEnum.java
  20. 0 27
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/SqlChangedEvent.java
  21. 3 9
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/SqlTable.java
  22. 12 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/TableGroupBufferActuatorService.java
  23. 1 11
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/util/PrimaryKeyUtil.java
  24. 4 4
      dbsyncer-web/src/main/resources/application.properties
  25. 6 0
      install.cmd

+ 6 - 0
dbsyncer-connector/dbsyncer-connector-base/pom.xml

@@ -76,6 +76,12 @@
             <version>${project.parent.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-log4j2</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

+ 128 - 4
dbsyncer-connector/dbsyncer-connector-base/src/test/java/ConnectionTest.java

@@ -17,12 +17,23 @@ import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 
 import java.nio.charset.Charset;
-import java.sql.*;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @Author AE86
@@ -117,7 +128,7 @@ public class ConnectionTest {
         long begin = Instant.now().toEpochMilli();
         final int threadSize = 10;
         final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
-        final String sql = "INSERT INTO `vote_records_copy` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
+        final String sql = "INSERT INTO `vote_records` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
 
         // 模拟1000w条数据
         List<Object[]> dataList = new ArrayList<>();
@@ -139,7 +150,7 @@ public class ConnectionTest {
             }
         }
 
-        if(!CollectionUtils.isEmpty(dataList)){
+        if (!CollectionUtils.isEmpty(dataList)) {
             System.out.println("-----------------正在处理剩余数据");
             batchUpdate(connectorInstance, pool, sql, dataList, 1000);
         }
@@ -218,6 +229,119 @@ public class ConnectionTest {
         logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
     }
 
+    @Test
+    public void testBatchIUD() {
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
+
+        long begin = Instant.now().toEpochMilli();
+        final int threadSize = 1000;
+        final int num = 100;
+        final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
+        final CountDownLatch latch = new CountDownLatch(threadSize);
+        final String insert = "INSERT INTO `vote_records_test` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
+        final String update = "UPDATE `test`.`vote_records_test` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
+        final String delete = "DELETE from `test`.`vote_records_test` WHERE `id` = ?";
+
+        // 模拟单表增删改事件,每个事件间隔2条数据
+        for (int i = 0; i < threadSize; i++) {
+            final int offset = i;
+            pool.submit(() -> {
+                try {
+                    logger.info("{}-开始任务", Thread.currentThread().getName());
+                    // 增删改事件密集型
+                    mockData(connectorInstance, num, offset, insert, update, delete);
+                    // 增改事件密集型
+//                    mockData2(connectorInstance, num, offset, insert, update);
+                    logger.info("{}-结束任务", Thread.currentThread().getName());
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+        pool.shutdown();
+//        logger.info("总数:{}, 耗时:{}秒", (threadSize * num * 3), (Instant.now().toEpochMilli() - begin) / 1000);
+        logger.info("总数:{}, 耗时:{}秒", (threadSize * num), (Instant.now().toEpochMilli() - begin) / 1000);
+    }
+
+    private void mockData(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update, String delete) {
+        int start = offset * num;
+        logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
+        List<Object[]> insertData = new ArrayList<>();
+        List<Object[]> updateData = new ArrayList<>();
+        List<Object[]> deleteData = new ArrayList<>();
+        for (int i = 0; i < num; i++) {
+            // insert
+            Object[] insertArgs = new Object[6];
+            insertArgs[0] = i + start;
+            insertArgs[1] = randomUserId(20);
+            insertArgs[2] = RandomUtil.nextInt(1, 9999);
+            insertArgs[3] = RandomUtil.nextInt(0, 3);
+            insertArgs[4] = RandomUtil.nextInt(1, 3);
+            insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
+            insertData.add(insertArgs);
+
+            // update
+            Object[] updateArgs = new Object[2];
+            updateArgs[0] = randomUserId(20);
+            updateArgs[1] = i + start;
+            updateData.add(updateArgs);
+
+            // delete
+            Object[] deleteArgs = new Object[1];
+            deleteArgs[0] = i + start;
+            deleteData.add(deleteArgs);
+
+            connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
+            connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
+            connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(delete, deleteData));
+            insertData.clear();
+            updateData.clear();
+            deleteData.clear();
+            logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
+        }
+    }
+
+    private void mockData2(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update) {
+        List<Object[]> insertData = new ArrayList<>();
+        List<Object[]> updateData = new ArrayList<>();
+        final int batch = 100;
+        int start = offset * num;
+        logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
+        for (int i = 1; i <= num; i++) {
+            // insert
+            Object[] insertArgs = new Object[6];
+            insertArgs[0] = i + start;
+            insertArgs[1] = randomUserId(20);
+            insertArgs[2] = RandomUtil.nextInt(1, 9999);
+            insertArgs[3] = RandomUtil.nextInt(0, 3);
+            insertArgs[4] = RandomUtil.nextInt(1, 3);
+            insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
+            insertData.add(insertArgs);
+
+            // update
+            Object[] updateArgs = new Object[2];
+            updateArgs[0] = randomUserId(20);
+            updateArgs[1] = i + start;
+            updateData.add(updateArgs);
+
+            if (i % batch == 0) {
+                connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
+                connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
+                logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
+                insertData.clear();
+                updateData.clear();
+            }
+        }
+    }
+
     private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
 
     private String randomUserId(int i) {

+ 2 - 1
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -146,12 +146,13 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
 
     @Override
     public String getConnectorInstanceCacheKey(ESConfig config) {
-        return String.format("%s-%s-%s-%s", config.getConnectorType(), config.getUrl(), config.getIndex(), config.getUsername());
+        return String.format("%s-%s-%s", config.getConnectorType(), config.getUrl(), config.getUsername());
     }
 
     @Override
     public List<Table> getTable(ESConnectorInstance connectorInstance) {
         try {
+            // TODO 获取所有索引和type
             ESConfig config = connectorInstance.getConfig();
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexResponse indexResponse = connectorInstance.getConnection().indices().get(request, RequestOptions.DEFAULT);

+ 0 - 8
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/DQLMySQLConnector.java

@@ -5,7 +5,6 @@ package org.dbsyncer.connector.mysql;
 
 import org.dbsyncer.connector.mysql.cdc.DqlMySQLListener;
 import org.dbsyncer.connector.mysql.validator.DqlMySQLConfigValidator;
-import org.dbsyncer.sdk.config.CommandConfig;
 import org.dbsyncer.sdk.config.ReaderConfig;
 import org.dbsyncer.sdk.connector.ConfigValidator;
 import org.dbsyncer.sdk.connector.database.AbstractDQLConnector;
@@ -15,8 +14,6 @@ import org.dbsyncer.sdk.listener.DatabaseQuartzListener;
 import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.PageSql;
 
-import java.util.Map;
-
 /**
  * DQLMySQL连接器实现
  *
@@ -51,11 +48,6 @@ public final class DQLMySQLConnector extends AbstractDQLConnector {
         return new Object[]{(pageIndex - 1) * pageSize, pageSize};
     }
 
-    @Override
-    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return super.getDqlSourceCommand(commandConfig, true);
-    }
-
     @Override
     public Listener getListener(String listenerType) {
         if (ListenerTypeEnum.isTiming(listenerType)) {

+ 2 - 6
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/storage/MySQLStorageService.java

@@ -249,13 +249,9 @@ public class MySQLStorageService extends AbstractStorageService {
     }
 
     private String buildQueryCountSql(Query query, Executor executor, List<Object> args) {
-        StringBuilder queryCount = new StringBuilder();
-        queryCount.append("SELECT COUNT(1) FROM (");
-        StringBuilder sql = new StringBuilder("SELECT 1 AS `_ROW` FROM `").append(executor.getTable()).append("`");
+        StringBuilder sql = new StringBuilder("SELECT COUNT(1) FROM `").append(executor.getTable()).append("`");
         buildQuerySqlWithParams(query, args, sql, null);
-        queryCount.append(sql);
-        queryCount.append(" GROUP BY `ID`) DBSYNCER_T");
-        return queryCount.toString();
+        return sql.toString();
     }
 
     private void buildQuerySqlWithParams(Query query, List<Object> args, StringBuilder sql, List<AbstractFilter> highLightKeys) {

+ 43 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserSupportConfiguration.java

@@ -0,0 +1,43 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.parser;
+
+import org.dbsyncer.parser.flush.impl.TableGroupBufferActuator;
+import org.dbsyncer.sdk.spi.TableGroupBufferActuatorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.ServiceLoader;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2024-01-25 23:43
+ */
+@Configuration
+public class ParserSupportConfiguration {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Bean
+    @ConditionalOnMissingBean
+    public TableGroupBufferActuator tableGroupBufferActuator() {
+        ServiceLoader<TableGroupBufferActuatorService> services = ServiceLoader.load(TableGroupBufferActuatorService.class, Thread.currentThread().getContextClassLoader());
+        for (TableGroupBufferActuatorService s : services) {
+            try {
+                TableGroupBufferActuatorService service = s.getClass().newInstance();
+                if (service instanceof TableGroupBufferActuator) {
+                    return (TableGroupBufferActuator) service;
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                throw new ParserException("获取TableGroupBufferActuator异常.");
+            }
+        }
+        return new TableGroupBufferActuator();
+    }
+}

+ 0 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/AbstractConsumer.java

@@ -13,7 +13,6 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.Watcher;
 import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
-import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 
 import java.util.List;
 import java.util.Map;
@@ -49,9 +48,6 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
     public void onDDLChanged(DDLChangedEvent event) {
     }
 
-    public void onSqlChanged(SqlChangedEvent event) {
-    }
-
     @Override
     public void changeEvent(ChangedEvent event) {
         event.getChangedOffset().setMetaId(metaId);
@@ -60,9 +56,6 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
             case SCAN:
                 onChange((E) event);
                 break;
-            case SQL:
-                onSqlChanged((SqlChangedEvent) event);
-                break;
             case DDL:
                 onDDLChanged((DDLChangedEvent) event);
                 break;

+ 0 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/LogConsumer.java

@@ -11,7 +11,6 @@ import org.dbsyncer.parser.consumer.AbstractConsumer;
 import org.dbsyncer.parser.model.FieldPicker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
-import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 import org.dbsyncer.sdk.model.Table;
 
 import java.util.ArrayList;
@@ -60,11 +59,6 @@ public final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
         process(event, picker -> execute(picker.getTableGroup().getId(), event));
     }
 
-    @Override
-    public void onSqlChanged(SqlChangedEvent event) {
-        process(event, picker -> execute(picker.getTableGroup().getId(), event));
-    }
-
     private void process(CommonChangedEvent event, Consumer<FieldPicker> consumer) {
         // 处理过程有异常向上抛
         List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());

+ 21 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.config.BufferActuatorConfig;
@@ -122,6 +125,23 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
     protected abstract void pull(Response response);
 
+    /**
+     * 批量处理分区数据
+     *
+     * @param map
+     */
+    protected void process(Map<String, Response> map){
+        map.forEach((key, response) -> {
+            long now = Instant.now().toEpochMilli();
+            try {
+                pull(response);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            logger.info("[{}{}]{}, {}ms", key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
+        });
+    }
+
     /**
      * 提交任务失败
      *
@@ -190,15 +210,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
             }
         }
 
-        map.forEach((key, response) -> {
-            long now = Instant.now().toEpochMilli();
-            try {
-                pull(response);
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-            logger.info("[{}{}]{}, {}ms", key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
-        });
+        process(map);
         map.clear();
         map = null;
         batchCounter = null;

+ 1 - 57
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -22,10 +22,6 @@ import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.model.WriterResponse;
-import org.dbsyncer.parser.sql.SqlParser;
-import org.dbsyncer.parser.sql.impl.DeleteSql;
-import org.dbsyncer.parser.sql.impl.InsertSql;
-import org.dbsyncer.parser.sql.impl.UpdateSql;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
@@ -33,7 +29,6 @@ import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.impl.IncrementPluginContext;
 import org.dbsyncer.sdk.config.DDLConfig;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
-import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.MetaInfo;
@@ -75,7 +70,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     private ParserComponent parserComponent;
 
     @Resource
-    private ProfileComponent profileComponent;
+    protected ProfileComponent profileComponent;
 
     @Resource
     private PluginFactory pluginFactory;
@@ -136,10 +131,6 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
             parseDDl(response, mapping, group);
             return;
         }
-        if (ChangedEventTypeEnum.isSQL(response.getTypeEnum())) {
-            parseSql(response, mapping, group);
-            return;
-        }
 
         final String sourceTableName = group.getSourceTable().getName();
         final String targetTableName = group.getTargetTable().getName();
@@ -236,53 +227,6 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         logger.warn("暂只支持数据库同源并且是关系性解析DDL");
     }
 
-    /**
-     * 解析sql并替换到目标sql
-     *
-     * @param response
-     * @param mapping
-     * @param group
-     */
-    private void parseSql(WriterResponse response, Mapping mapping, TableGroup group) {
-        String sql = response.getSql();
-        final String sourceTableName = group.getSourceTable().getName();
-        final String targetTableName = group.getTargetTable().getName();
-        final String event = response.getEvent();
-        // 根据不同的语句进行sql替换拼接
-        switch (event) {
-            case ConnectorConstant.OPERTION_INSERT:
-                SqlParser insertSql = new InsertSql(sql, sourceTableName, targetTableName, group.getFieldMapping());
-                sql = insertSql.parse();
-                break;
-            case ConnectorConstant.OPERTION_UPDATE:
-                SqlParser updateSql = new UpdateSql(sql, sourceTableName, targetTableName, group.getFieldMapping());
-                sql = updateSql.parse();
-                break;
-            case ConnectorConstant.OPERTION_DELETE:
-                SqlParser deleteSql = new DeleteSql(sql, sourceTableName, targetTableName, group.getFieldMapping());
-                sql = deleteSql.parse();
-                break;
-            default:
-                break;
-        }
-        logger.info("execute sql:{}", sql);
-
-        ConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
-        final ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
-        // TODO life 这里需要重新设计实现方案,不支持异构同步场景
-        DDLConfig ddlConfig = new DDLConfig();
-        ddlConfig.setSql(sql);
-        Result result = connectorFactory.writerDDL(tConnectorInstance, ddlConfig);
-
-        // 6.发布刷新增量点事件
-        applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
-
-        // 7、持久化同步结果
-        result.setTableGroupId(group.getId());
-        result.setTargetTableGroupName(targetTableName);
-        flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
-    }
-
     /**
      * 获取连接器配置
      *

+ 2 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -9,8 +9,8 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.ThreadPoolUtil;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.parser.flush.BufferRequest;
+import org.dbsyncer.sdk.spi.TableGroupBufferActuatorService;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.concurrent.Executor;
@@ -22,8 +22,7 @@ import java.util.concurrent.Executor;
  * @Author AE86
  * @Date 2023-03-27 16:50
  */
-@Component
-public final class TableGroupBufferActuator extends GeneralBufferActuator implements Cloneable {
+public class TableGroupBufferActuator extends GeneralBufferActuator implements Cloneable, TableGroupBufferActuatorService {
 
     @Resource
     private TableGroupBufferConfig tableGroupBufferConfig;

+ 7 - 0
dbsyncer-sdk/pom.xml

@@ -38,5 +38,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <!-- sqlserver -->
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 </project>

+ 3 - 23
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDQLConnector.java

@@ -41,11 +41,6 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         return tables;
     }
 
-    @Override
-    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return getDqlSourceCommand(commandConfig, false);
-    }
-
     @Override
     public MetaInfo getMetaInfo(DatabaseConnectorInstance connectorInstance, String sqlName) {
         DatabaseConfig cfg = connectorInstance.getConfig();
@@ -63,14 +58,8 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         return null;
     }
 
-    /**
-     * 获取DQL源配置
-     *
-     * @param commandConfig
-     * @param groupByPK
-     * @return
-     */
-    protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean groupByPK) {
+    @Override
+    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
         // 获取过滤SQL
         String queryFilterSql = getQueryFilterSql(commandConfig);
         Table table = commandConfig.getTable();
@@ -92,16 +81,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
         // 获取查询总数SQL
         StringBuilder queryCount = new StringBuilder();
-        queryCount.append("SELECT COUNT(1) FROM (").append(querySql);
-
-        // Mysql
-        if (groupByPK) {
-            queryCount.append(" GROUP BY ");
-            // id,id2
-            String quotation = buildSqlWithQuotation();
-            PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, ",", "", true);
-        }
-        queryCount.append(") DBSYNCER_T");
+        queryCount.append("SELECT COUNT(1) FROM (").append(querySql).append(") DBS_T");
         map.put(SqlBuilderEnum.QUERY_COUNT.getName(), queryCount.toString());
         return map;
     }

+ 0 - 7
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/DatabaseTemplate.java

@@ -510,9 +510,6 @@ public class DatabaseTemplate implements JdbcOperations {
     @Override
     public int[] batchUpdate(final String... sql) throws DataAccessException {
         Assert.notEmpty(sql, "SQL array must not be empty");
-        if (logger.isDebugEnabled()) {
-            logger.debug("Executing SQL batch update of " + sql.length + " statements");
-        }
 
         /**
          * Callback to execute the batch update.
@@ -584,10 +581,6 @@ public class DatabaseTemplate implements JdbcOperations {
 
         Assert.notNull(psc, "PreparedStatementCreator must not be null");
         Assert.notNull(action, "Callback object must not be null");
-        if (logger.isDebugEnabled()) {
-            String sql = getSql(psc);
-            logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
-        }
 
         PreparedStatement ps = null;
         try {

+ 1 - 9
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/sqlbuilder/SqlBuilderQueryCount.java

@@ -6,9 +6,6 @@ package org.dbsyncer.sdk.connector.database.sqlbuilder;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.config.SqlBuilderConfig;
 import org.dbsyncer.sdk.connector.database.Database;
-import org.dbsyncer.sdk.util.PrimaryKeyUtil;
-
-import java.util.List;
 
 /**
  * @author AE86
@@ -22,12 +19,11 @@ public class SqlBuilderQueryCount extends SqlBuilderQuery {
         Database database = config.getDatabase();
         String quotation = database.buildSqlWithQuotation();
         String tableName = config.getTableName();
-        List<String> primaryKeys = database.buildPrimaryKeys(config.getPrimaryKeys());
         String schema = config.getSchema();
         String queryFilter = config.getQueryFilter();
 
         StringBuilder sql = new StringBuilder();
-        sql.append("SELECT COUNT(1) FROM (SELECT 1 AS ").append(quotation).append("_ROW").append(quotation).append(" FROM ");
+        sql.append("SELECT COUNT(1) FROM ");
         sql.append(schema);
         sql.append(quotation);
         sql.append(database.buildTableName(tableName));
@@ -35,10 +31,6 @@ public class SqlBuilderQueryCount extends SqlBuilderQuery {
         if (StringUtil.isNotBlank(queryFilter)) {
             sql.append(queryFilter);
         }
-        sql.append(" GROUP BY ");
-        // id,uid
-        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
-        sql.append(") DBSYNCER_T");
         return sql.toString();
     }
 

+ 10 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/IntegerValueMapper.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.sdk.connector.schema;
 
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.connector.AbstractValueMapper;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
@@ -35,6 +36,15 @@ public class IntegerValueMapper extends AbstractValueMapper<Integer> {
             return new Integer(b ? 1 : 0);
         }
 
+        if (val instanceof Short) {
+            Short s = (Short) val;
+            return s.intValue();
+        }
+
+        if (val instanceof String) {
+            return NumberUtil.toInt((String) val);
+        }
+
         throw new SdkException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 6 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/TimestampValueMapper.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.sdk.connector.schema;
 
+import microsoft.sql.DateTimeOffset;
 import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.connector.AbstractValueMapper;
@@ -62,6 +63,11 @@ public class TimestampValueMapper extends AbstractValueMapper<Timestamp> {
             return Timestamp.from(date.toInstant());
         }
 
+        if (val instanceof microsoft.sql.DateTimeOffset) {
+            LocalDateTime dateTime = ((DateTimeOffset) val).getOffsetDateTime().toLocalDateTime();
+            return Timestamp.valueOf(dateTime);
+        }
+
         throw new SdkException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 4 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/VarcharValueMapper.java

@@ -28,6 +28,10 @@ public class VarcharValueMapper extends AbstractValueMapper<String> {
             return Integer.toString((Integer) val);
         }
 
+        if (val instanceof Long) {
+            return Long.toString((Long) val);
+        }
+
         if (val instanceof LocalDateTime) {
             return ((LocalDateTime) val).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
         }

+ 0 - 7
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/ChangedEventTypeEnum.java

@@ -12,10 +12,6 @@ public enum ChangedEventTypeEnum {
      * ddl变更
      */
     DDL,
-    /**
-     * 表列变更,比如oracle, 会获取变动列的sql
-     */
-    SQL,
     /**
      * 定时变更
      */
@@ -29,7 +25,4 @@ public enum ChangedEventTypeEnum {
         return event != null && DDL == event;
     }
 
-    public static boolean isSQL(ChangedEventTypeEnum event) {
-        return event != null && SQL == event;
-    }
 }

+ 0 - 27
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/SqlChangedEvent.java

@@ -1,27 +0,0 @@
-/**
- * DBSyncer Copyright 2020-2023 All Rights Reserved.
- */
-package org.dbsyncer.sdk.listener.event;
-
-import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
-
-/**
- * @Author AE86
- * @Version 1.0.0
- * @Date 2023-12-09 20:34
- */
-public final class SqlChangedEvent extends CommonChangedEvent {
-
-    public SqlChangedEvent(String sourceTableName, String event, String sql, String nextFileName, Object position) {
-        setSourceTableName(sourceTableName);
-        setEvent(event);
-        setNextFileName(nextFileName);
-        setPosition(position);
-        setSql(sql);
-    }
-
-    @Override
-    public ChangedEventTypeEnum getType() {
-        return ChangedEventTypeEnum.SQL;
-    }
-}

+ 3 - 9
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/SqlTable.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
 package org.dbsyncer.sdk.model;
 
 public class SqlTable {
@@ -8,15 +11,6 @@ public class SqlTable {
 
     private String table;
 
-    public SqlTable() {
-    }
-
-    public SqlTable(String sqlName, String sql, String table) {
-        this.sqlName = sqlName;
-        this.sql = sql;
-        this.table = table;
-    }
-
     public String getSqlName() {
         return sqlName;
     }

+ 12 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/TableGroupBufferActuatorService.java

@@ -0,0 +1,12 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.sdk.spi;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2024-01-26 00:55
+ */
+public interface TableGroupBufferActuatorService {
+}

+ 1 - 11
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/util/PrimaryKeyUtil.java

@@ -37,7 +37,7 @@ public abstract class PrimaryKeyUtil {
         }
 
         // 获取表同步的主键字段
-        List<String> primaryKeys = findPrimaryKeys(table.getColumn());
+        List<String> primaryKeys = findPrimaryKeyFields(table.getColumn()).stream().map(f -> f.getName()).collect(Collectors.toList());
 
         // 如果存在表字段映射关系,没有配置主键则抛出异常提示
         if (!CollectionUtils.isEmpty(table.getColumn()) && CollectionUtils.isEmpty(primaryKeys)) {
@@ -46,16 +46,6 @@ public abstract class PrimaryKeyUtil {
         return primaryKeys;
     }
 
-    /**
-     * 返回主键名称
-     *
-     * @param fields
-     * @return
-     */
-    public static List<String> findPrimaryKeys(List<Field> fields) {
-        return findPrimaryKeyFields(fields).stream().map(f -> f.getName()).collect(Collectors.toList());
-    }
-
     /**
      * 返回主键属性字段集合
      *

+ 4 - 4
dbsyncer-web/src/main/resources/application.properties

@@ -33,17 +33,17 @@ dbsyncer.parser.general.buffer-period-millisecond=300
 # 每个驱动最多可分配的表执行器个数
 dbsyncer.parser.table.group.max-buffer-actuator-size=20
 # [TableGroupBufferActuator]线程数
-dbsyncer.parser.table.group.thread-core-size=2
+dbsyncer.parser.table.group.thread-core-size=1
 # [TableGroupBufferActuator]最大线程数
-dbsyncer.parser.table.group.max-thread-size=10
+dbsyncer.parser.table.group.max-thread-size=1
 # [TableGroupBufferActuator]线程池队列
 dbsyncer.parser.table.group.thread-queue-capacity=16
 # [TableGroupBufferActuator]单次执行任务数
 dbsyncer.parser.table.group.buffer-writer-count=1000
 # [TableGroupBufferActuator]每次消费缓存队列的任务数
-dbsyncer.parser.table.group.buffer-pull-count=20000
+dbsyncer.parser.table.group.buffer-pull-count=1000
 # [TableGroupBufferActuator]缓存队列容量
-dbsyncer.parser.table.group.buffer-queue-capacity=40000
+dbsyncer.parser.table.group.buffer-queue-capacity=10000
 # [TableGroupBufferActuator]定时消费缓存队列间隔(毫秒)
 dbsyncer.parser.table.group.buffer-period-millisecond=300
 

+ 6 - 0
install.cmd

@@ -0,0 +1,6 @@
+@echo off
+
+echo "Install ..."
+call mvn install -Dmaven.test.skip=true
+
+:exit