Kaynağa Gözat

调整:边生产边消费,写入日志的异常数据太大调整

yjwang 3 yıl önce
ebeveyn
işleme
b9c23e32c3
17 değiştirilmiş dosya ile 224 ekleme ve 54 silme
  1. 43 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/FailData.java
  2. 28 11
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java
  3. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  4. 8 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  5. 8 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java
  6. 3 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  7. 16 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  8. 6 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  9. 10 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  10. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  11. 2 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java
  12. 26 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  13. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  14. 5 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java
  15. 2 1
      dbsyncer-storage/src/main/resources/dbsyncer_data.sql
  16. 50 0
      dbsyncer-web/src/main/java/org/dbsyncer/web/config/ScheduleConfig.java
  17. 12 2
      dbsyncer-web/src/main/java/org/dbsyncer/web/config/ThreadPoolConfig.java

+ 43 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/model/FailData.java

@@ -0,0 +1,43 @@
+package org.dbsyncer.common.model;
+
+import java.util.List;
+
+/**
+ * <p></p>
+ *
+ * @author yjwang
+ * @date 2022/5/10 19:01
+ */
+public class FailData<T> {
+
+    private List<T> failList;
+
+    private String error;
+
+    public FailData() {
+        super();
+    }
+
+    public FailData(List<T> failList, String error) {
+        super();
+        this.failList = failList;
+        this.error = error;
+    }
+
+    public void setFailList(List<T> failList) {
+        this.failList = failList;
+    }
+
+    public List<T> getFailList() {
+        return failList;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+
+    public String getError() {
+        return error;
+    }
+}
+

+ 28 - 11
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -5,14 +5,20 @@ import java.util.List;
 
 
 public class Result<T> {
 public class Result<T> {
 
 
-    // 成功数据
-    private List<T> successData = new LinkedList<>();
+    /**
+     * 成功数据
+     */
+    private final List<T> successData = new LinkedList<>();
 
 
-    // 错误数据
-    private List<T> failData = new LinkedList<>();
+    /**
+     * 错误数据
+     */
+    private final List<FailData<T>> failData = new LinkedList<>();
 
 
-    // 错误日志
-    private StringBuffer error = new StringBuffer();
+    /**
+     * 错误日志
+     */
+    private final StringBuffer error = new StringBuffer();
 
 
     private final Object LOCK = new Object();
     private final Object LOCK = new Object();
 
 
@@ -27,7 +33,7 @@ public class Result<T> {
         return successData;
         return successData;
     }
     }
 
 
-    public List<T> getFailData() {
+    public List<FailData<T>> getFailData() {
         return failData;
         return failData;
     }
     }
 
 
@@ -38,9 +44,9 @@ public class Result<T> {
     /**
     /**
      * 线程安全添加集合
      * 线程安全添加集合
      *
      *
-     * @param failData
+     * @param failData 失败数据
      */
      */
-    public void addFailData(List failData) {
+    public void addFailData(List<FailData<T>> failData) {
         synchronized (LOCK) {
         synchronized (LOCK) {
             this.failData.addAll(failData);
             this.failData.addAll(failData);
         }
         }
@@ -49,9 +55,20 @@ public class Result<T> {
     /**
     /**
      * 线程安全添加集合
      * 线程安全添加集合
      *
      *
-     * @param successData
+     * @param failData 失败数据
+     */
+    public void addFailData(FailData<T> failData) {
+        synchronized (LOCK) {
+            this.failData.add(failData);
+        }
+    }
+
+    /**
+     * 线程安全添加集合
+     *
+     * @param successData 成功数据
      */
      */
-    public void addSuccessData(List successData) {
+    public void addSuccessData(List<T> successData) {
         synchronized (LOCK) {
         synchronized (LOCK) {
             this.successData.addAll(successData);
             this.successData.addAll(successData);
         }
         }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database;
 package org.dbsyncer.connector.database;
 
 
+import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
@@ -143,8 +144,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
                     })
                     })
             );
             );
         } catch (Exception e) {
         } catch (Exception e) {
-            result.addFailData(data);
-            result.getError().append(e.getMessage());
+            result.addFailData(new FailData(data, e.getMessage()));
         }
         }
 
 
         if (null != execute) {
         if (null != execute) {

+ 8 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.es;
 package org.dbsyncer.connector.es;
 
 
+import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
@@ -8,7 +9,10 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.ESConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ESFieldTypeEnum;
 import org.dbsyncer.connector.enums.ESFieldTypeEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
@@ -194,8 +198,9 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             result.addSuccessData(data);
             result.addSuccessData(data);
         } catch (Exception e) {
         } catch (Exception e) {
             // 记录错误数据
             // 记录错误数据
-            result.addFailData(data);
-            result.getError().append(e.getMessage()).append(System.lineSeparator());
+//            result.addFailData(data);
+//            result.getError().append(e.getMessage()).append(System.lineSeparator());
+            result.addFailData(new FailData(data, e.getMessage()));
             logger.error(e.getMessage());
             logger.error(e.getMessage());
         }
         }
         return result;
         return result;

+ 8 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.kafka;
 package org.dbsyncer.connector.kafka;
 
 
+import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
@@ -7,7 +8,10 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.KafkaConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.model.Table;
@@ -89,8 +93,9 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
             result.addSuccessData(data);
             result.addSuccessData(data);
         } catch (Exception e) {
         } catch (Exception e) {
             // 记录错误数据
             // 记录错误数据
-            result.addFailData(data);
-            result.getError().append(e.getMessage()).append(System.lineSeparator());
+//            result.addFailData(data);
+//            result.getError().append(e.getMessage()).append(System.lineSeparator());
+            result.addFailData(new FailData(data, e.getMessage()));
             logger.error(e.getMessage());
             logger.error(e.getMessage());
         }
         }
         return result;
         return result;

+ 3 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -31,7 +31,7 @@ public abstract class AbstractExtractor implements Extractor {
     protected ListenerConfig listenerConfig;
     protected ListenerConfig listenerConfig;
     protected Map<String, String> snapshot;
     protected Map<String, String> snapshot;
     protected Set<String> filterTable;
     protected Set<String> filterTable;
-    private List<Event> watcher = new CopyOnWriteArrayList<>();
+    private final List<Event> watcher = new CopyOnWriteArrayList<>();
 
 
     @Override
     @Override
     public void addListener(Event event) {
     public void addListener(Event event) {
@@ -48,7 +48,8 @@ public abstract class AbstractExtractor implements Extractor {
     @Override
     @Override
     public void changedEvent(RowChangedEvent event) {
     public void changedEvent(RowChangedEvent event) {
         if(null != event){
         if(null != event){
-            taskExecutor.execute(() -> watcher.forEach(w -> w.changedEvent(event)));
+//            taskExecutor.execute(() ->);
+            watcher.forEach(w -> w.changedEvent(event));
         }
         }
     }
     }
 
 

+ 16 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -8,7 +8,10 @@ import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
@@ -314,7 +317,17 @@ public class ParserFactory implements Parser {
         pluginFactory.convert(tableGroup.getPlugin(), eventName, data, target);
         pluginFactory.convert(tableGroup.getPlugin(), eventName, data, target);
 
 
         // 4、写入缓冲执行器
         // 4、写入缓冲执行器
-        writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(), mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(), eventName, picker.getTargetFields(), tableGroup.getCommand()));
+        int size = writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(),
+                mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(),
+                eventName, picker.getTargetFields(), tableGroup.getCommand()));
+        if (size >= 400000) {
+            try {
+                Thread.sleep(30000);
+                logger.info("暂停30秒:{}", size);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
     }
     }
 
 
     /**
     /**
@@ -362,7 +375,7 @@ public class ParserFactory implements Parser {
                     Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data, forceUpdate));
                     Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data, forceUpdate));
                     result.addSuccessData(w.getSuccessData());
                     result.addSuccessData(w.getSuccessData());
                     result.addFailData(w.getFailData());
                     result.addFailData(w.getFailData());
-                    result.getError().append(w.getError());
+//                    result.getError().append(w.getError());
                 } catch (Exception e) {
                 } catch (Exception e) {
                     logger.error(e.getMessage());
                     logger.error(e.getMessage());
                 } finally {
                 } finally {

+ 6 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -11,7 +11,7 @@ import java.time.Instant;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -28,9 +28,9 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @Autowired
     @Autowired
     private ScheduledTaskService scheduledTaskService;
     private ScheduledTaskService scheduledTaskService;
 
 
-    private Queue<Request> buffer = new ConcurrentLinkedQueue();
+    private Queue<Request> buffer = new LinkedBlockingQueue();
 
 
-    private Queue<Request> temp = new ConcurrentLinkedQueue();
+    private Queue<Request> temp = new LinkedBlockingQueue();
 
 
     private final Lock lock = new ReentrantLock(true);
     private final Lock lock = new ReentrantLock(true);
 
 
@@ -81,12 +81,13 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     protected abstract void pull(Response response);
     protected abstract void pull(Response response);
 
 
     @Override
     @Override
-    public void offer(BufferRequest request) {
+    public int offer(BufferRequest request) {
         if (running) {
         if (running) {
             temp.offer((Request) request);
             temp.offer((Request) request);
-            return;
+            return temp.size();
         }
         }
         buffer.offer((Request) request);
         buffer.offer((Request) request);
+        return buffer.size();
     }
     }
 
 
     @Override
     @Override

+ 10 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush;
 package org.dbsyncer.parser.flush;
 
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
@@ -43,7 +44,7 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
 
 
     protected void refreshTotal(String metaId, Result writer) {
     protected void refreshTotal(String metaId, Result writer) {
         Meta meta = getMeta(metaId);
         Meta meta = getMeta(metaId);
-        meta.getFail().getAndAdd(writer.getFailData().size());
+        meta.getFail().getAndAdd(getFailDataSize(writer));
         meta.getSuccess().getAndAdd(writer.getSuccessData().size());
         meta.getSuccess().getAndAdd(writer.getSuccessData().size());
     }
     }
 
 
@@ -54,4 +55,12 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         return meta;
         return meta;
     }
     }
 
 
+    protected int getFailDataSize(Result<Object> writer) {
+        int failCount = 0;
+        for (FailData<Object> failData : writer.getFailData()) {
+            failCount = failCount + failData.getFailList().size();
+        }
+        return failCount;
+    }
+
 }
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -7,6 +7,6 @@ package org.dbsyncer.parser.flush;
  */
  */
 public interface BufferActuator {
 public interface BufferActuator {
 
 
-    void offer(BufferRequest request);
+    int offer(BufferRequest request);
 
 
 }
 }

+ 2 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -3,7 +3,6 @@ package org.dbsyncer.parser.flush;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Async;
 
 
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 
 
 public interface FlushService {
 public interface FlushService {
 
 
@@ -24,6 +23,6 @@ public interface FlushService {
      * @param success
      * @param success
      * @param data
      * @param data
      */
      */
-    @Async("taskExecutor")
-    void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error);
+//    @Async("taskExecutor")
+    void asyncWrite(String metaId, String event, boolean success, List<Object> data, String error);
 }
 }

+ 26 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush.impl;
 package org.dbsyncer.parser.flush.impl;
 
 
 import com.alibaba.fastjson.JSONException;
 import com.alibaba.fastjson.JSONException;
+import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.flush.FlushService;
@@ -19,7 +20,6 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 
 /**
 /**
  * 持久化
  * 持久化
@@ -55,10 +55,22 @@ public class FlushServiceImpl implements FlushService {
     }
     }
 
 
     @Override
     @Override
-    public void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error) {
+    public void asyncWrite(String metaId, String event, boolean success, List<Object> data, String error) {
+        if (!success) {
+            data.forEach(r -> {
+                FailData failData = (FailData) r;
+                doWrite(metaId, event, success, failData.getFailList(), failData.getError());
+            });
+        } else {
+            doWrite(metaId, event, success, data, error);
+        }
+    }
+
+    private void doWrite(String metaId, String event, boolean success, List<Object> data, String error) {
         long now = Instant.now().toEpochMilli();
         long now = Instant.now().toEpochMilli();
-        List<Map> list = data.parallelStream().map(r -> {
-            Map<String, Object> params = new HashMap();
+        data.parallelStream().forEach(r -> {
+            Map<String, Object> params = new HashMap<>();
+
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
             params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
             params.put(ConfigConstant.DATA_EVENT, event);
             params.put(ConfigConstant.DATA_EVENT, event);
@@ -70,10 +82,15 @@ public class FlushServiceImpl implements FlushService {
                 params.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
                 params.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
             }
             }
             params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
             params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            return params;
-        }).collect(Collectors.toList());
-
-        storageBufferActuator.offer(new StorageRequest(metaId, list));
+            int count = storageBufferActuator.offer(new StorageRequest(metaId, params));
+            if (count > 400000) {
+                try {
+                    Thread.sleep(30000);
+                    logger.info("==================>全量同步 休眠30秒:{}", count);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
     }
     }
-
 }
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -22,7 +22,7 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
 
 
     @Override
     @Override
     protected long getPeriod() {
     protected long getPeriod() {
-        return 3000;
+        return 300;
     }
     }
 
 
     @Override
     @Override
@@ -38,7 +38,7 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
     @Override
     @Override
     protected void partition(StorageRequest request, StorageResponse response) {
     protected void partition(StorageRequest request, StorageResponse response) {
         response.setMetaId(request.getMetaId());
         response.setMetaId(request.getMetaId());
-        response.getDataList().addAll(request.getList());
+        response.getDataList().add(request.getMap());
     }
     }
 
 
     @Override
     @Override

+ 5 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java

@@ -2,7 +2,6 @@ package org.dbsyncer.parser.flush.model;
 
 
 import org.dbsyncer.parser.flush.BufferRequest;
 import org.dbsyncer.parser.flush.BufferRequest;
 
 
-import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 /**
 /**
@@ -14,18 +13,18 @@ public class StorageRequest implements BufferRequest {
 
 
     private String metaId;
     private String metaId;
 
 
-    private List<Map> list;
+    private Map<String, Object> map;
 
 
-    public StorageRequest(String metaId, List<Map> list) {
+    public StorageRequest(String metaId, Map<String, Object> map) {
         this.metaId = metaId;
         this.metaId = metaId;
-        this.list = list;
+        this.map = map;
     }
     }
 
 
     public String getMetaId() {
     public String getMetaId() {
         return metaId;
         return metaId;
     }
     }
 
 
-    public List<Map> getList() {
-        return list;
+    public Map<String, Object> getMap() {
+        return map;
     }
     }
 }
 }

+ 2 - 1
dbsyncer-storage/src/main/resources/dbsyncer_data.sql

@@ -1,12 +1,13 @@
 CREATE TABLE `dbsyncer_data` (
 CREATE TABLE `dbsyncer_data` (
   `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
   `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
   `SUCCESS` int(1) NOT NULL COMMENT '成功1/失败0',
   `SUCCESS` int(1) NOT NULL COMMENT '成功1/失败0',
-  `EVENT` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '事件',
+  `EVENT` varchar(20) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '事件',
   `ERROR` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL COMMENT '异常信息',
   `ERROR` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL COMMENT '异常信息',
   `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',
   `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',
   `JSON` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '同步数据',
   `JSON` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '同步数据',
   PRIMARY KEY (`ID`) USING BTREE,
   PRIMARY KEY (`ID`) USING BTREE,
   INDEX `IDX_SUCCESS`(`SUCCESS`) USING BTREE,
   INDEX `IDX_SUCCESS`(`SUCCESS`) USING BTREE,
+  INDEX `IDX_EVENT`(`EVENT`) USING BTREE,
   INDEX `IDX_SUCCESS_CREATE_TIME`(`SUCCESS`, `CREATE_TIME`) USING BTREE,
   INDEX `IDX_SUCCESS_CREATE_TIME`(`SUCCESS`, `CREATE_TIME`) USING BTREE,
   FULLTEXT INDEX `FULL_TEXT_ERROR`(`ERROR`)
   FULLTEXT INDEX `FULL_TEXT_ERROR`(`ERROR`)
 ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin COMMENT = '同步数据表' ROW_FORMAT = Dynamic;
 ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin COMMENT = '同步数据表' ROW_FORMAT = Dynamic;

+ 50 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/config/ScheduleConfig.java

@@ -0,0 +1,50 @@
+package org.dbsyncer.web.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+import java.util.concurrent.RejectedExecutionHandler;
+
+/**
+ * @author yjwang
+ * @date 2022/4/29 10:27
+ */
+@Configuration
+public class ScheduleConfig implements SchedulingConfigurer {
+
+    @Override
+    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
+        scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
+    }
+
+    @Bean(name = "taskScheduler", destroyMethod = "shutdown")
+    public ThreadPoolTaskScheduler taskScheduler() {
+        int poolSize = Runtime.getRuntime().availableProcessors() * 2;
+        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+        //核心线程池大小
+        scheduler.setPoolSize(poolSize);
+        //线程名字前缀
+        scheduler.setThreadNamePrefix("taskScheduler-");
+        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
+        scheduler.setWaitForTasksToCompleteOnShutdown(true);
+        //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
+        scheduler.setAwaitTerminationSeconds(60);
+        // 线程池满,拒绝策略
+        scheduler.setRejectedExecutionHandler(rejectedExecutionHandler());
+
+        return scheduler;
+    }
+
+    public RejectedExecutionHandler rejectedExecutionHandler() {
+        return (r, executor) -> {
+            try {
+                executor.getQueue().put(r);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        };
+    }
+}

+ 12 - 2
dbsyncer-web/src/main/java/org/dbsyncer/web/config/ThreadPoolConfig.java

@@ -6,7 +6,7 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.RejectedExecutionHandler;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -53,11 +53,21 @@ public class ThreadPoolConfig {
         这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
         这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
         DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
         DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
         该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心*/
         该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心*/
-        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+        executor.setRejectedExecutionHandler(rejectedExecutionHandler());
         executor.setWaitForTasksToCompleteOnShutdown(true);
         executor.setWaitForTasksToCompleteOnShutdown(true);
         executor.setAwaitTerminationSeconds(30);
         executor.setAwaitTerminationSeconds(30);
         executor.initialize();
         executor.initialize();
         return executor;
         return executor;
     }
     }
 
 
+    public RejectedExecutionHandler rejectedExecutionHandler() {
+        return (r, executor) -> {
+            try {
+                executor.getQueue().put(r);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        };
+    }
+
 }
 }