AE86 5 年之前
父節點
當前提交
c7a34ccf02

+ 0 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -118,7 +118,6 @@ public class MysqlExtractor extends DefaultExtractor {
         client.setBinlogPosition(nextPosition);
 
         // nextPosition
-        logger.info("{}:{}", client.getBinlogFileName(), client.getBinlogPosition());
         map.put(BINLOG_FILENAME, client.getBinlogFileName());
         map.put(BINLOG_POSITION, String.valueOf(client.getBinlogPosition()));
         flushEvent();

+ 6 - 14
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.parser;
 
-import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.FullRefreshEvent;
 import org.dbsyncer.common.model.Result;
@@ -9,6 +8,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -230,7 +230,7 @@ public class ParserFactory implements Parser {
 
     @Override
     public void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent) {
-        logger.info("同步数据=> dataEvent:{}", dataEvent);
+        logger.info("{}", dataEvent);
         final String metaId = mapping.getMetaId();
 
         ConnectorConfig tConfig = getConnectorConfig(mapping.getTargetConnectorId());
@@ -262,7 +262,7 @@ public class ParserFactory implements Parser {
         // 5、更新结果
         List<Map<String, Object>> list = new ArrayList<>(1);
         list.add(target);
-        flush(metaId, writer, list);
+        flush(metaId, writer, event, list);
     }
 
     /**
@@ -273,22 +273,20 @@ public class ParserFactory implements Parser {
      * @param data
      */
     private void flush(Task task, Result writer, List<Map<String, Object>> data) {
-        flush(task.getId(), writer, data);
+        flush(task.getId(), writer, ConnectorConstant.OPERTION_DELETE, data);
 
         // 发布刷新事件给FullExtractor
         task.setEndTime(System.currentTimeMillis());
         applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
     }
 
-    private void flush(String metaId, Result writer, List<Map<String, Object>> data) {
+    private void flush(String metaId, Result writer, String event, List<Map<String, Object>> data) {
         // 引用传递
         long total = data.size();
         long fail = writer.getFail().get();
         Meta meta = getMeta(metaId);
         meta.getFail().getAndAdd(fail);
         meta.getSuccess().getAndAdd(total - fail);
-        // print process
-        logger.info("任务:{}, 成功:{}, 失败:{}", metaId, meta.getSuccess(), meta.getFail());
 
         // 记录错误数据
         Queue<Map<String, Object>> failData = writer.getFailData();
@@ -297,14 +295,8 @@ public class ParserFactory implements Parser {
             data.clear();
             data.addAll(failData);
         }
-        flushService.asyncWrite(metaId, success, data);
-
-        // 记录错误日志
         String error = writer.getError().toString();
-        if (StringUtils.isNotBlank(error)) {
-            flushService.asyncWrite(metaId, error);
-        }
-
+        flushService.asyncWrite(metaId, event, success, data, error);
     }
 
     /**

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -14,15 +14,16 @@ public interface FlushService {
      * @param error
      */
     @Async("taskExecutor")
-    void asyncWrite(String metaId, String error);
+    void asyncWrite(String type, String error);
 
     /**
      * 记录数据
      *
      * @param metaId
+     * @param event
      * @param success
      * @param data
      */
     @Async("taskExecutor")
-    void asyncWrite(String metaId, boolean success, List<Map<String, Object>> data);
+    void asyncWrite(String metaId, String event, boolean success, List<Map<String, Object>> data, String error);
 }

+ 5 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -34,22 +34,24 @@ public class FlushServiceImpl implements FlushService {
     private SnowflakeIdWorker snowflakeIdWorker;
 
     @Override
-    public void asyncWrite(String metaId, String error) {
+    public void asyncWrite(String type, String error) {
         Map<String, Object> params = new HashMap();
         params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
-        params.put(ConfigConstant.CONFIG_MODEL_TYPE, metaId);
+        params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
         params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
         params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, System.currentTimeMillis());
         storageService.addLog(StorageEnum.LOG, params);
     }
 
     @Override
-    public void asyncWrite(String metaId, boolean success, List<Map<String, Object>> data) {
+    public void asyncWrite(String metaId, String event, boolean success, List<Map<String, Object>> data, String error) {
         long now = System.currentTimeMillis();
         List<Map> list = data.parallelStream().map(r -> {
             Map<String, Object> params = new HashMap();
             params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
             params.put(ConfigConstant.DATA_SUCCESS, success);
+            params.put(ConfigConstant.DATA_EVENT, event);
+            params.put(ConfigConstant.DATA_ERROR, error);
             params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
             params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
             return params;

+ 2 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -29,5 +29,7 @@ public class ConfigConstant {
      * 数据
      */
     public static final String DATA_SUCCESS = "success";
+    public static final String DATA_EVENT = "event";
+    public static final String DATA_ERROR = "error";
 
 }