AE86 5 năm trước cách đây
mục cha
commit
7af330648a

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -256,7 +256,7 @@ public abstract class AbstractDatabaseConnector implements Database {
                 }
             });
             if (0 == update) {
-                throw new ConnectorException("写入失败");
+                throw new ConnectorException(String.format("执行%s操作失败:%s", event, data));
             }
         } catch (Exception e) {
             // 记录错误数据

+ 18 - 11
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -20,7 +20,6 @@ import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.config.Plugin;
-import org.dbsyncer.storage.constant.ConfigConstant;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
@@ -222,7 +221,7 @@ public class ParserFactory implements Parser {
             Result writer = writeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
 
             // 6、更新结果
-            flush(task, writer, target.size());
+            flush(task, writer, target);
 
             // 7、更新分页数
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
@@ -261,7 +260,9 @@ public class ParserFactory implements Parser {
         Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), command, event, target);
 
         // 5、更新结果
-        flush(metaId, writer, 1);
+        List<Map<String, Object>> list = new ArrayList<>(1);
+        list.add(target);
+        flush(metaId, writer, list);
     }
 
     /**
@@ -269,18 +270,19 @@ public class ParserFactory implements Parser {
      *
      * @param task
      * @param writer
-     * @param total
+     * @param data
      */
-    private void flush(Task task, Result writer, long total) {
-        flush(task.getId(), writer, total);
+    private void flush(Task task, Result writer, List<Map<String, Object>> data) {
+        flush(task.getId(), writer, data);
 
         // 发布刷新事件给FullExtractor
         task.setEndTime(System.currentTimeMillis());
         applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
     }
 
-    private void flush(String metaId, Result writer, long total) {
+    private void flush(String metaId, Result writer, List<Map<String, Object>> data) {
         // 引用传递
+        long total = data.size();
         long fail = writer.getFail().get();
         Meta meta = getMeta(metaId);
         meta.getFail().getAndAdd(fail);
@@ -289,12 +291,17 @@ public class ParserFactory implements Parser {
         logger.info("任务:{}, 成功:{}, 失败:{}", metaId, meta.getSuccess(), meta.getFail());
 
         // 记录错误数据
-        if(!writer.getFailData().isEmpty()){
-            flushService.asyncWrite(metaId, writer.getFailData());
+        Queue<Map<String, Object>> failData = writer.getFailData();
+        boolean success = CollectionUtils.isEmpty(failData);
+        if (!success) {
+            data.clear();
+            data.addAll(failData);
         }
+        flushService.asyncWrite(metaId, success, data);
+
         // 记录错误日志
         String error = writer.getError().toString();
-        if(StringUtils.isNotBlank(error)){
+        if (StringUtils.isNotBlank(error)) {
             flushService.asyncWrite(metaId, error);
         }
 
@@ -340,7 +347,7 @@ public class ParserFactory implements Parser {
      * @return
      */
     private Result writeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> target,
-                                int threadSize, int batchSize) {
+                              int threadSize, int batchSize) {
         // 总数
         int total = target.size();
         // 单次任务

+ 5 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -2,8 +2,8 @@ package org.dbsyncer.parser.flush;
 
 import org.springframework.scheduling.annotation.Async;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
 public interface FlushService {
 
@@ -17,12 +17,12 @@ public interface FlushService {
     void asyncWrite(String metaId, String error);
 
     /**
-     * 记录错误数据
+     * 记录数据
      *
      * @param metaId
-     * @param failData
+     * @param success
+     * @param data
      */
     @Async("taskExecutor")
-    void asyncWrite(String metaId, Queue<Map<String,Object>> failData);
-
+    void asyncWrite(String metaId, boolean success, List<Map<String, Object>> data);
 }

+ 9 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * 持久化
@@ -27,7 +28,7 @@ public class FlushServiceImpl implements FlushService {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Autowired
-    private StorageService    storageService;
+    private StorageService storageService;
 
     @Autowired
     private SnowflakeIdWorker snowflakeIdWorker;
@@ -43,17 +44,16 @@ public class FlushServiceImpl implements FlushService {
     }
 
     @Override
-    public void asyncWrite(String metaId, Queue<Map<String, Object>> data) {
-        List<Map> list = new LinkedList<>();
+    public void asyncWrite(String metaId, boolean success, List<Map<String, Object>> data) {
         long now = System.currentTimeMillis();
-        Map<String, Object> params = null;
-        while (!data.isEmpty()){
-            params = new HashMap();
+        List<Map> list = data.parallelStream().map(r -> {
+            Map<String, Object> params = new HashMap();
             params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
-            params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(data.poll()));
+            params.put(ConfigConstant.DATA_SUCCESS, success);
+            params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
             params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            list.add(params);
-        }
+            return params;
+        }).collect(Collectors.toList());
         storageService.addData(StorageEnum.DATA, metaId, list);
     }
 }

+ 5 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -25,4 +25,9 @@ public class ConfigConstant {
     public static final String TABLE_GROUP = "tableGroup";
     public static final String META = "meta";
 
+    /**
+     * 数据
+     */
+    public static final String DATA_SUCCESS = "success";
+
 }

+ 4 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -98,14 +98,16 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
 
     @Override
     public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
-        createShardIfNotExist(collectionId);
+//        createShardIfNotExist(collectionId);
         // TODO 实现日志写入
+        logger.info(params.toString());
     }
 
     @Override
     public void insertData(String collectionId, List<Map> list) throws IOException {
-        createShardIfNotExist(collectionId);
+//        createShardIfNotExist(collectionId);
         // TODO 实现数据写入
+        logger.info(list.toString());
 
     }