AE86 1 rok pred
rodič
commit
0653d763d8
24 zmenil súbory, kde vykonal 134 pridanie a 172 odobranie
  1. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  2. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MetricReporter.java
  3. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java
  4. 1 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/MySQLConnector.java
  5. 1 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java
  6. 2 1
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java
  7. 6 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/FullPuller.java
  8. 6 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java
  9. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/PreloadTemplate.java
  10. 4 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/command/impl/PersistenceCommand.java
  11. 3 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/command/impl/PreloadCommand.java
  12. 0 27
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java
  13. 0 95
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  14. 3 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  15. 7 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  16. 6 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java
  17. 42 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/LogServiceImpl.java
  18. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/OperationTemplate.java
  19. 41 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/FlushStrategyImpl.java
  20. 1 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorService.java
  21. 0 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/storage/AbstractStorageService.java
  22. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/storage/StorageService.java
  23. 1 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageSupportConfiguration.java
  24. 2 2
      dbsyncer-web/src/main/resources/application.properties

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -21,7 +21,7 @@ import org.dbsyncer.sdk.listener.event.RowChangedEvent;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.filter.FieldResolver;
 import org.dbsyncer.sdk.filter.Query;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.storage.util.BinlogMessageUtil;

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MetricReporter.java

@@ -26,7 +26,7 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.enums.StorageEnum;
 import org.dbsyncer.sdk.filter.Query;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.slf4j.Logger;

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -44,7 +44,7 @@ import org.dbsyncer.sdk.filter.BooleanFilter;
 import org.dbsyncer.sdk.filter.FieldResolver;
 import org.dbsyncer.sdk.filter.Query;
 import org.dbsyncer.sdk.filter.impl.LongFilter;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.slf4j.Logger;

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/MySQLConnector.java

@@ -15,7 +15,7 @@ import org.dbsyncer.sdk.constant.DatabaseConstant;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.PageSql;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java

@@ -104,7 +104,7 @@ public class MySQLListener extends AbstractDatabaseListener {
     }
 
     private void run() throws Exception {
-        final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+        final DatabaseConfig config = getConnectorInstance().getConfig();
         if (StringUtil.isBlank(config.getUrl())) {
             throw new MySQLException("url is invalid");
         }

+ 2 - 1
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -52,7 +52,8 @@ public class OracleListener extends AbstractDatabaseListener {
     @Override
     public void start() {
         try {
-            final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+            // TODO [increment-worker-1184659161326161921] 这里应该单独启动一个线程,线程名要有一定意义,如:binlog-parser-127.0.0.1:3306_123,便于监控排查问题
+            final DatabaseConfig config = getConnectorInstance().getConfig();
             String driverClassName = config.getDriverClassName();
             String username = config.getUsername();
             String password = config.getPassword();

+ 6 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/FullPuller.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.manager.impl;
 
 import org.dbsyncer.common.util.NumberUtil;
@@ -32,9 +35,9 @@ import java.util.concurrent.Executors;
 /**
  * 全量同步
  *
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/26 15:28
+ * @Version 1.0.0
+ * @Author AE86
+ * @Date 2020-04-26 15:28
  */
 @Component
 public final class FullPuller extends AbstractPuller implements ApplicationListener<FullRefreshEvent> {

+ 6 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.manager.impl;
 
 import org.dbsyncer.common.util.CollectionUtils;
@@ -46,9 +49,9 @@ import java.util.stream.Collectors;
 /**
  * 增量同步
  *
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/26 15:28
+ * @Version 1.0.0
+ * @Author AE86
+ * @Date 2020-04-26 15:28
  */
 @Component
 public final class IncrementPuller extends AbstractPuller implements ApplicationListener<RefreshOffsetEvent>, ScheduledTaskJob {

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/PreloadTemplate.java

@@ -11,7 +11,7 @@ import org.dbsyncer.manager.ManagerFactory;
 import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.ProfileComponent;
-import org.dbsyncer.parser.command.PreloadCommand;
+import org.dbsyncer.parser.command.impl.PreloadCommand;
 import org.dbsyncer.parser.enums.CommandEnum;
 import org.dbsyncer.parser.enums.GroupStrategyEnum;
 import org.dbsyncer.parser.enums.MetaEnum;
@@ -26,7 +26,7 @@ import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.sdk.enums.StorageEnum;
 import org.dbsyncer.sdk.filter.Query;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationListener;

+ 4 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/command/PersistenceCommand.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/command/impl/PersistenceCommand.java

@@ -1,10 +1,11 @@
 /**
  * DBSyncer Copyright 2020-2023 All Rights Reserved.
  */
-package org.dbsyncer.parser.command;
+package org.dbsyncer.parser.command.impl;
 
+import org.dbsyncer.parser.command.Command;
 import org.dbsyncer.sdk.enums.StorageEnum;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 
 import java.util.Map;
 
@@ -15,7 +16,7 @@ import java.util.Map;
  * @Author AE86
  * @Date 2023-11-12 01:32
  */
-public class PersistenceCommand implements Command {
+public final class PersistenceCommand implements Command {
 
     private StorageService storageService;
 

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/command/PreloadCommand.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/command/impl/PreloadCommand.java

@@ -1,9 +1,10 @@
 /**
  * DBSyncer Copyright 2020-2023 All Rights Reserved.
  */
-package org.dbsyncer.parser.command;
+package org.dbsyncer.parser.command.impl;
 
 import org.dbsyncer.parser.ProfileComponent;
+import org.dbsyncer.parser.command.Command;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
@@ -19,7 +20,7 @@ import org.dbsyncer.parser.model.UserConfig;
  * @Author AE86
  * @Date 2023-11-12 01:32
  */
-public class PreloadCommand implements Command {
+public final class PreloadCommand implements Command {
 
     private ProfileComponent profileComponent;
 

+ 0 - 27
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -1,27 +0,0 @@
-package org.dbsyncer.parser.flush;
-
-import java.util.List;
-import java.util.Map;
-
-public interface FlushService {
-
-    /**
-     * 记录错误日志
-     *
-     * @param type
-     * @param error
-     */
-    void asyncWrite(String type, String error);
-
-    /**
-     * 记录数据
-     *
-     * @param metaId
-     * @param tableGroupId
-     * @param targetTableGroupName
-     * @param event
-     * @param success
-     * @param data
-     */
-    void asyncWrite(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error);
-}

+ 0 - 95
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,95 +0,0 @@
-/**
- * DBSyncer Copyright 2020-2023 All Rights Reserved.
- */
-package org.dbsyncer.parser.flush.impl;
-
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.parser.ProfileComponent;
-import org.dbsyncer.parser.flush.BufferActuator;
-import org.dbsyncer.parser.flush.FlushService;
-import org.dbsyncer.parser.model.StorageRequest;
-import org.dbsyncer.parser.model.SystemConfig;
-import org.dbsyncer.sdk.constant.ConfigConstant;
-import org.dbsyncer.sdk.enums.StorageEnum;
-import org.dbsyncer.sdk.spi.StorageService;
-import org.dbsyncer.storage.enums.StorageDataStatusEnum;
-import org.dbsyncer.storage.impl.SnowflakeIdWorker;
-import org.dbsyncer.storage.util.BinlogMessageUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.time.Instant;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * 持久化
- * <p>全量或增量数据</p>
- * <p>系统日志</p>
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/05/19 18:38
- */
-@Component
-public final class FlushServiceImpl implements FlushService {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Resource
-    private StorageService storageService;
-
-    @Resource
-    private SnowflakeIdWorker snowflakeIdWorker;
-
-    @Resource
-    private BufferActuator storageBufferActuator;
-
-    @Resource
-    private Executor storageExecutor;
-
-    @Resource
-    private ProfileComponent profileComponent;
-
-    @Override
-    public void asyncWrite(String type, String error) {
-        storageExecutor.execute(() -> {
-            Map<String, Object> params = new HashMap();
-            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-            params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
-            params.put(ConfigConstant.CONFIG_MODEL_JSON, StringUtil.substring(error, 0, getSystemConfig().getMaxStorageErrorLength()));
-            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
-            storageService.add(StorageEnum.LOG, params);
-        });
-    }
-
-    @Override
-    public void asyncWrite(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
-        long now = Instant.now().toEpochMilli();
-        data.forEach(r -> {
-            Map<String, Object> row = new HashMap();
-            row.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-            row.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
-            row.put(ConfigConstant.DATA_TABLE_GROUP_ID, tableGroupId);
-            row.put(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableGroupName);
-            row.put(ConfigConstant.DATA_EVENT, event);
-            row.put(ConfigConstant.DATA_ERROR, error);
-            row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            try {
-                byte[] bytes = BinlogMessageUtil.toBinlogMap(r).toByteArray();
-                row.put(ConfigConstant.BINLOG_DATA, bytes);
-            } catch (Exception e) {
-                logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
-            }
-            storageBufferActuator.offer(new StorageRequest(metaId, row));
-        });
-    }
-
-    private SystemConfig getSystemConfig() {
-        return profileComponent.getSystemConfig();
-    }
-}

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -53,9 +53,9 @@ import java.util.concurrent.Executor;
 /**
  * 通用执行器(单线程消费,多线程批量写,按序执行)
  *
- * @author AE86
- * @version 1.0.0
- * @date 2022/3/27 16:50
+ * @Version 1.0.0
+ * @Author AE86
+ * @Date 2022-03-27 16:50
  */
 @Component
 public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {

+ 7 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.config.StorageConfig;
@@ -8,7 +11,7 @@ import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
 import org.dbsyncer.sdk.enums.StorageEnum;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -21,9 +24,9 @@ import java.util.concurrent.Executor;
 /**
  * 持久化执行器
  *
- * @author AE86
- * @version 1.0.0
- * @date 2022/3/27 16:50
+ * @Version 1.0.0
+ * @Author AE86
+ * @Date 2023-03-27 16:50
  */
 @Component
 public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {

+ 6 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.config.TableGroupBufferConfig;
@@ -15,9 +18,9 @@ import java.util.concurrent.Executor;
 /**
  * 表执行器(根据表消费数据,多线程批量写,按序执行)
  *
- * @author AE86
- * @version 1.0.0
- * @date 2022/3/27 16:50
+ * @Version 1.0.0
+ * @Author AE86
+ * @Date 2023-03-27 16:50
  */
 @Component
 public final class TableGroupBufferActuator extends GeneralBufferActuator implements Cloneable {

+ 42 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/LogServiceImpl.java

@@ -1,11 +1,24 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.parser.impl;
 
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogType;
-import org.dbsyncer.parser.flush.FlushService;
+import org.dbsyncer.parser.ProfileComponent;
+import org.dbsyncer.parser.model.SystemConfig;
+import org.dbsyncer.sdk.constant.ConfigConstant;
+import org.dbsyncer.sdk.enums.StorageEnum;
+import org.dbsyncer.sdk.storage.StorageService;
+import org.dbsyncer.storage.impl.SnowflakeIdWorker;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * @version 1.0.0
@@ -16,20 +29,44 @@ import javax.annotation.Resource;
 public class LogServiceImpl implements LogService {
 
     @Resource
-    private FlushService flushService;
+    private StorageService storageService;
+
+    @Resource
+    private SnowflakeIdWorker snowflakeIdWorker;
+
+    @Resource
+    private Executor storageExecutor;
+
+    @Resource
+    private ProfileComponent profileComponent;
 
     @Override
     public void log(LogType logType) {
-        flushService.asyncWrite(logType.getType(), String.format("%s%s", logType.getName(), logType.getMessage()));
+        asyncWrite(logType.getType(), String.format("%s%s", logType.getName(), logType.getMessage()));
     }
 
     @Override
     public void log(LogType logType, String msg) {
-        flushService.asyncWrite(logType.getType(), null == msg ? logType.getMessage() : msg);
+        asyncWrite(logType.getType(), null == msg ? logType.getMessage() : msg);
     }
 
     @Override
     public void log(LogType logType, String format, Object... args) {
-        flushService.asyncWrite(logType.getType(), String.format(format, args));
+        asyncWrite(logType.getType(), String.format(format, args));
+    }
+
+    private void asyncWrite(String type, String error) {
+        storageExecutor.execute(() -> {
+            Map<String, Object> params = new HashMap();
+            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+            params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
+            params.put(ConfigConstant.CONFIG_MODEL_JSON, StringUtil.substring(error, 0, getSystemConfig().getMaxStorageErrorLength()));
+            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
+            storageService.add(StorageEnum.LOG, params);
+        });
+    }
+
+    private SystemConfig getSystemConfig() {
+        return profileComponent.getSystemConfig();
     }
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/OperationTemplate.java

@@ -7,7 +7,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.CacheService;
 import org.dbsyncer.parser.ParserException;
-import org.dbsyncer.parser.command.PersistenceCommand;
+import org.dbsyncer.parser.command.impl.PersistenceCommand;
 import org.dbsyncer.parser.enums.CommandEnum;
 import org.dbsyncer.parser.enums.GroupStrategyEnum;
 import org.dbsyncer.parser.model.ConfigModel;
@@ -16,7 +16,7 @@ import org.dbsyncer.parser.model.QueryConfig;
 import org.dbsyncer.parser.strategy.GroupStrategy;
 import org.dbsyncer.parser.util.ConfigModelUtil;
 import org.dbsyncer.sdk.enums.StorageEnum;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;

+ 41 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushStrategyImpl.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/FlushStrategyImpl.java

@@ -1,7 +1,7 @@
 /**
  * DBSyncer Copyright 2020-2023 All Rights Reserved.
  */
-package org.dbsyncer.parser.flush.impl;
+package org.dbsyncer.parser.strategy.impl;
 
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -10,10 +10,15 @@ import org.dbsyncer.parser.CacheService;
 import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.ProfileComponent;
-import org.dbsyncer.parser.flush.FlushService;
+import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.SystemConfig;
 import org.dbsyncer.parser.strategy.FlushStrategy;
+import org.dbsyncer.sdk.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageDataStatusEnum;
+import org.dbsyncer.storage.impl.SnowflakeIdWorker;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -21,11 +26,14 @@ import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
 import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
- * @author AE86
- * @version 1.0.0
- * @date 2021/11/18 22:22
+ * @Version 1.0.0
+ * @Author AE86
+ * @Date 2021-11-18 22:22
  */
 @Component
 public final class FlushStrategyImpl implements FlushStrategy {
@@ -33,7 +41,7 @@ public final class FlushStrategyImpl implements FlushStrategy {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Resource
-    private FlushService flushService;
+    private SnowflakeIdWorker snowflakeIdWorker;
 
     @Resource
     private CacheService cacheService;
@@ -44,6 +52,9 @@ public final class FlushStrategyImpl implements FlushStrategy {
     @Resource
     private LogService logService;
 
+    @Resource
+    private BufferActuator storageBufferActuator;
+
     @Override
     public void flushFullData(String metaId, Result result, String event) {
         // 不记录全量数据, 只记录增量同步数据, 将异常记录到系统日志中
@@ -67,7 +78,28 @@ public final class FlushStrategyImpl implements FlushStrategy {
         flush(metaId, result, event);
     }
 
-    protected void refreshTotal(String metaId, Result writer) {
+    private void asyncWrite(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
+        long now = Instant.now().toEpochMilli();
+        data.forEach(r -> {
+            Map<String, Object> row = new HashMap();
+            row.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+            row.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
+            row.put(ConfigConstant.DATA_TABLE_GROUP_ID, tableGroupId);
+            row.put(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableGroupName);
+            row.put(ConfigConstant.DATA_EVENT, event);
+            row.put(ConfigConstant.DATA_ERROR, error);
+            row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+            try {
+                byte[] bytes = BinlogMessageUtil.toBinlogMap(r).toByteArray();
+                row.put(ConfigConstant.BINLOG_DATA, bytes);
+            } catch (Exception e) {
+                logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
+            }
+            storageBufferActuator.offer(new StorageRequest(metaId, row));
+        });
+    }
+
+    private void refreshTotal(String metaId, Result writer) {
         Assert.hasText(metaId, "Meta id can not be empty.");
         Meta meta = cacheService.get(metaId, Meta.class);
         if (meta != null) {
@@ -83,12 +115,12 @@ public final class FlushStrategyImpl implements FlushStrategy {
         // 是否写失败数据
         if (getSystemConfig().isEnableStorageWriteFail() && !CollectionUtils.isEmpty(result.getFailData())) {
             final String error = StringUtil.substring(result.getError().toString(), 0, getSystemConfig().getMaxStorageErrorLength());
-            flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
+            asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
         // 是否写成功数据
         if (getSystemConfig().isEnableStorageWriteSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
-            flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
+            asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }
 

+ 1 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorService.java

@@ -16,6 +16,7 @@ import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
+import org.dbsyncer.sdk.storage.StorageService;
 
 import java.util.List;
 import java.util.Map;

+ 0 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/storage/AbstractStorageService.java

@@ -11,7 +11,6 @@ import org.dbsyncer.sdk.enums.StorageEnum;
 import org.dbsyncer.sdk.enums.StorageStrategyEnum;
 import org.dbsyncer.sdk.filter.BooleanFilter;
 import org.dbsyncer.sdk.filter.Query;
-import org.dbsyncer.sdk.spi.StorageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/StorageService.java → dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/storage/StorageService.java

@@ -1,7 +1,7 @@
 /**
  * DBSyncer Copyright 2020-2023 All Rights Reserved.
  */
-package org.dbsyncer.sdk.spi;
+package org.dbsyncer.sdk.storage;
 
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.sdk.enums.StorageEnum;

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageSupportConfiguration.java

@@ -5,7 +5,7 @@ package org.dbsyncer.storage;
 
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.sdk.spi.StorageService;
+import org.dbsyncer.sdk.storage.StorageService;
 import org.dbsyncer.storage.impl.DiskStorageService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.origin.OriginTrackedValue;

+ 2 - 2
dbsyncer-web/src/main/resources/application.properties

@@ -50,8 +50,8 @@ dbsyncer.parser.table.group.buffer-period-millisecond=300
 #storage
 # 数据存储类型:disk(默认)/mysql(推荐生产环境使用)
 # disk-磁盘:/data/config(驱动配置)|data(按驱动分别存储增量数据)|log(系统日志)
-dbsyncer.storage.type=disk
-dbsyncer.storage.mysql.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true
+dbsyncer.storage.type=mysql
+dbsyncer.storage.mysql.url=jdbc:mysql://127.0.0.1:3305/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true
 dbsyncer.storage.mysql.username=root
 dbsyncer.storage.mysql.password=123
 # [StorageBufferActuator]线程数