AE86 před 3 roky
rodič
revize
d437312e26

+ 0 - 43
dbsyncer-common/src/main/java/org/dbsyncer/common/model/FailData.java

@@ -1,43 +0,0 @@
-package org.dbsyncer.common.model;
-
-import java.util.List;
-
-/**
- * <p></p>
- *
- * @author yjwang
- * @date 2022/5/10 19:01
- */
-public class FailData<T> {
-
-    private List<T> failList;
-
-    private String error;
-
-    public FailData() {
-        super();
-    }
-
-    public FailData(List<T> failList, String error) {
-        super();
-        this.failList = failList;
-        this.error = error;
-    }
-
-    public void setFailList(List<T> failList) {
-        this.failList = failList;
-    }
-
-    public List<T> getFailList() {
-        return failList;
-    }
-
-    public void setError(String error) {
-        this.error = error;
-    }
-
-    public String getError() {
-        return error;
-    }
-}
-

+ 7 - 18
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -13,12 +13,12 @@ public class Result<T> {
     /**
      * 错误数据
      */
-    private final List<FailData<T>> failData = new LinkedList<>();
+    private final List<T> failData = new LinkedList<>();
 
     /**
      * 错误日志
      */
-    private final StringBuffer error = new StringBuffer();
+    private StringBuffer error = new StringBuffer();
 
     private final Object LOCK = new Object();
 
@@ -33,7 +33,7 @@ public class Result<T> {
         return successData;
     }
 
-    public List<FailData<T>> getFailData() {
+    public List<T> getFailData() {
         return failData;
     }
 
@@ -44,9 +44,9 @@ public class Result<T> {
     /**
      * 线程安全添加集合
      *
-     * @param failData 失败数据
+     * @param failData
      */
-    public void addFailData(List<FailData<T>> failData) {
+    public void addFailData(List failData) {
         synchronized (LOCK) {
             this.failData.addAll(failData);
         }
@@ -55,20 +55,9 @@ public class Result<T> {
     /**
      * 线程安全添加集合
      *
-     * @param failData 失败数据
+     * @param successData
      */
-    public void addFailData(FailData<T> failData) {
-        synchronized (LOCK) {
-            this.failData.add(failData);
-        }
-    }
-
-    /**
-     * 线程安全添加集合
-     *
-     * @param successData 成功数据
-     */
-    public void addSuccessData(List<T> successData) {
+    public void addSuccessData(List successData) {
         synchronized (LOCK) {
             this.successData.addAll(successData);
         }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.connector.database;
 
-import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -144,7 +143,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
                     })
             );
         } catch (Exception e) {
-            result.addFailData(new FailData(data, e.getMessage()));
+            result.addFailData(data);
+            result.getError().append(e.getMessage());
         }
 
         if (null != execute) {

+ 3 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.connector.es;
 
-import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
@@ -9,10 +8,7 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.ESConfig;
-import org.dbsyncer.connector.config.ReaderConfig;
-import org.dbsyncer.connector.config.WriterBatchConfig;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ESFieldTypeEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
@@ -198,9 +194,8 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             result.addSuccessData(data);
         } catch (Exception e) {
             // 记录错误数据
-//            result.addFailData(data);
-//            result.getError().append(e.getMessage()).append(System.lineSeparator());
-            result.addFailData(new FailData(data, e.getMessage()));
+            result.addFailData(data);
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         }
         return result;

+ 3 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.connector.kafka;
 
-import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
@@ -8,10 +7,7 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.KafkaConfig;
-import org.dbsyncer.connector.config.ReaderConfig;
-import org.dbsyncer.connector.config.WriterBatchConfig;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
@@ -93,9 +89,8 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
             result.addSuccessData(data);
         } catch (Exception e) {
             // 记录错误数据
-//            result.addFailData(data);
-//            result.getError().append(e.getMessage()).append(System.lineSeparator());
-            result.addFailData(new FailData(data, e.getMessage()));
+            result.addFailData(data);
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         }
         return result;

+ 0 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -48,7 +48,6 @@ public abstract class AbstractExtractor implements Extractor {
     @Override
     public void changedEvent(RowChangedEvent event) {
         if(null != event){
-//            taskExecutor.execute(() ->);
             watcher.forEach(w -> w.changedEvent(event));
         }
     }

+ 3 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -8,10 +8,7 @@ import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.ReaderConfig;
-import org.dbsyncer.connector.config.WriterBatchConfig;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
@@ -317,17 +314,7 @@ public class ParserFactory implements Parser {
         pluginFactory.convert(tableGroup.getPlugin(), eventName, data, target);
 
         // 4、写入缓冲执行器
-        int size = writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(),
-                mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(),
-                eventName, picker.getTargetFields(), tableGroup.getCommand()));
-        if (size >= 400000) {
-            try {
-                Thread.sleep(30000);
-                logger.info("暂停30秒:{}", size);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
+        writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(), mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(), eventName, picker.getTargetFields(), tableGroup.getCommand()));
     }
 
     /**
@@ -375,7 +362,7 @@ public class ParserFactory implements Parser {
                     Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data, forceUpdate));
                     result.addSuccessData(w.getSuccessData());
                     result.addFailData(w.getFailData());
-//                    result.getError().append(w.getError());
+                    result.getError().append(w.getError());
                 } catch (Exception e) {
                     logger.error(e.getMessage());
                 } finally {

+ 21 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -12,6 +12,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -28,9 +29,11 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @Autowired
     private ScheduledTaskService scheduledTaskService;
 
-    private Queue<Request> buffer = new LinkedBlockingQueue();
+    private static final int CAPACITY = 10_0000;
 
-    private Queue<Request> temp = new LinkedBlockingQueue();
+    private Queue<Request> buffer = new LinkedBlockingQueue(CAPACITY);
+
+    private Queue<Request> temp = new LinkedBlockingQueue(CAPACITY);
 
     private final Lock lock = new ReentrantLock(true);
 
@@ -81,13 +84,25 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     protected abstract void pull(Response response);
 
     @Override
-    public int offer(BufferRequest request) {
+    public void offer(BufferRequest request) {
+        int size = 0;
         if (running) {
             temp.offer((Request) request);
-            return temp.size();
+            size = temp.size();
+        } else {
+            buffer.offer((Request) request);
+            size = temp.size();
+        }
+
+        // TODO 临时解决方案:生产大于消费问题,限制生产速度
+        if (size >= CAPACITY) {
+            try {
+                TimeUnit.SECONDS.sleep(30);
+                logger.warn("当前任务队列大小{}已达上限{},请稍等{}秒", size, CAPACITY, 30);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage());
+            }
         }
-        buffer.offer((Request) request);
-        return buffer.size();
     }
 
     @Override

+ 1 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.model.Meta;
@@ -44,7 +43,7 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
 
     protected void refreshTotal(String metaId, Result writer) {
         Meta meta = getMeta(metaId);
-        meta.getFail().getAndAdd(getFailDataSize(writer));
+        meta.getFail().getAndAdd(writer.getFailData().size());
         meta.getSuccess().getAndAdd(writer.getSuccessData().size());
     }
 
@@ -55,12 +54,4 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         return meta;
     }
 
-    protected int getFailDataSize(Result<Object> writer) {
-        int failCount = 0;
-        for (FailData<Object> failData : writer.getFailData()) {
-            failCount = failCount + failData.getFailList().size();
-        }
-        return failCount;
-    }
-
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -7,6 +7,6 @@ package org.dbsyncer.parser.flush;
  */
 public interface BufferActuator {
 
-    int offer(BufferRequest request);
+    void offer(BufferRequest request);
 
 }

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -3,6 +3,7 @@ package org.dbsyncer.parser.flush;
 import org.springframework.scheduling.annotation.Async;
 
 import java.util.List;
+import java.util.Map;
 
 public interface FlushService {
 
@@ -23,6 +24,6 @@ public interface FlushService {
      * @param success
      * @param data
      */
-//    @Async("taskExecutor")
-    void asyncWrite(String metaId, String event, boolean success, List<Object> data, String error);
+    @Async("taskExecutor")
+    void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error);
 }

+ 9 - 26
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.parser.flush.impl;
 
 import com.alibaba.fastjson.JSONException;
-import org.dbsyncer.common.model.FailData;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
@@ -20,6 +19,7 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * 持久化
@@ -55,22 +55,10 @@ public class FlushServiceImpl implements FlushService {
     }
 
     @Override
-    public void asyncWrite(String metaId, String event, boolean success, List<Object> data, String error) {
-        if (!success) {
-            data.forEach(r -> {
-                FailData failData = (FailData) r;
-                doWrite(metaId, event, success, failData.getFailList(), failData.getError());
-            });
-        } else {
-            doWrite(metaId, event, success, data, error);
-        }
-    }
-
-    private void doWrite(String metaId, String event, boolean success, List<Object> data, String error) {
+    public void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
-        data.parallelStream().forEach(r -> {
-            Map<String, Object> params = new HashMap<>();
-
+        List<Map> list = data.parallelStream().map(r -> {
+            Map<String, Object> params = new HashMap();
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
             params.put(ConfigConstant.DATA_EVENT, event);
@@ -82,15 +70,10 @@ public class FlushServiceImpl implements FlushService {
                 params.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
             }
             params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            int count = storageBufferActuator.offer(new StorageRequest(metaId, params));
-            if (count > 400000) {
-                try {
-                    Thread.sleep(30000);
-                    logger.info("==================>全量同步 休眠30秒:{}", count);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        });
+            return params;
+        }).collect(Collectors.toList());
+
+        storageBufferActuator.offer(new StorageRequest(metaId, list));
     }
+
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -22,7 +22,7 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
 
     @Override
     protected long getPeriod() {
-        return 300;
+        return 500;
     }
 
     @Override
@@ -38,7 +38,7 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
     @Override
     protected void partition(StorageRequest request, StorageResponse response) {
         response.setMetaId(request.getMetaId());
-        response.getDataList().add(request.getMap());
+        response.getDataList().addAll(request.getList());
     }
 
     @Override

+ 6 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser.flush.model;
 
 import org.dbsyncer.parser.flush.BufferRequest;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -13,18 +14,18 @@ public class StorageRequest implements BufferRequest {
 
     private String metaId;
 
-    private Map<String, Object> map;
+    private List<Map> list;
 
-    public StorageRequest(String metaId, Map<String, Object> map) {
+    public StorageRequest(String metaId, List<Map> list) {
         this.metaId = metaId;
-        this.map = map;
+        this.list = list;
     }
 
     public String getMetaId() {
         return metaId;
     }
 
-    public Map<String, Object> getMap() {
-        return map;
+    public List<Map> getList() {
+        return list;
     }
 }