AE86 5 éve
szülő
commit
821d5328bf

+ 16 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser;
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.FullRefreshEvent;
 import org.dbsyncer.common.model.Result;
@@ -13,11 +14,13 @@ import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
+import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.config.Plugin;
+import org.dbsyncer.storage.constant.ConfigConstant;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
@@ -53,6 +56,9 @@ public class ParserFactory implements Parser {
     @Autowired
     private CacheService cacheService;
 
+    @Autowired
+    private FlushService flushService;
+
     @Autowired
     private ApplicationContext applicationContext;
 
@@ -282,7 +288,16 @@ public class ParserFactory implements Parser {
         // print process
         logger.info("任务:{}, 成功:{}, 失败:{}", metaId, meta.getSuccess(), meta.getFail());
 
-        // TODO 记录错误日志
+        // 记录错误数据
+        if(!writer.getFailData().isEmpty()){
+            flushService.asyncWrite(metaId, writer.getFailData());
+        }
+        // 记录错误日志
+        String error = writer.getError().toString();
+        if(StringUtils.isNotBlank(error)){
+            flushService.asyncWrite(metaId, error);
+        }
+
     }
 
     /**

+ 28 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -0,0 +1,28 @@
+package org.dbsyncer.parser.flush;
+
+import org.springframework.scheduling.annotation.Async;
+
+import java.util.Map;
+import java.util.Queue;
+
+public interface FlushService {
+
+    /**
+     * 记录错误日志
+     *
+     * @param metaId
+     * @param error
+     */
+    @Async("taskExecutor")
+    void asyncWrite(String metaId, String error);
+
+    /**
+     * 记录错误数据
+     *
+     * @param metaId
+     * @param failData
+     */
+    @Async("taskExecutor")
+    void asyncWrite(String metaId, Queue<Map<String,Object>> failData);
+
+}

+ 59 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -0,0 +1,59 @@
+package org.dbsyncer.parser.flush;
+
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.storage.SnowflakeIdWorker;
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+/**
+ * 持久化
+ * <p>全量或增量数据</p>
+ * <p>系统日志</p>
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/05/19 18:38
+ */
+@Component
+public class FlushServiceImpl implements FlushService {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private StorageService    storageService;
+
+    @Autowired
+    private SnowflakeIdWorker snowflakeIdWorker;
+
+    @Override
+    public void asyncWrite(String metaId, String error) {
+        Map<String, Object> params = new HashMap();
+        params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
+        params.put(ConfigConstant.CONFIG_MODEL_TYPE, metaId);
+        params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
+        params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, System.currentTimeMillis());
+        storageService.addLog(StorageEnum.LOG, params);
+    }
+
+    @Override
+    public void asyncWrite(String metaId, Queue<Map<String, Object>> data) {
+        List<Map> list = new LinkedList<>();
+        long now = System.currentTimeMillis();
+        Map<String, Object> params = null;
+        while (!data.isEmpty()){
+            params = new HashMap();
+            params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
+            params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(data.poll()));
+            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+            list.add(params);
+        }
+        storageService.addData(StorageEnum.DATA, metaId, list);
+    }
+}

+ 36 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java

@@ -33,6 +33,22 @@ public abstract class AbstractStorageService implements StorageService, Applicat
 
     public abstract void delete(String collectionId, String id) throws IOException;
 
+    /**
+     * 记录日志
+     *
+     * @param collectionId
+     * @param params
+     */
+    public abstract void insertLog(String collectionId, Map<String,Object> params) throws IOException;
+
+    /**
+     * 记录错误数据
+     *
+     * @param collectionId
+     * @param list
+     */
+    public abstract void insertData(String collectionId, List<Map> list) throws IOException;
+
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
         map = applicationContext.getBeansOfType(Strategy.class);
@@ -100,6 +116,26 @@ public abstract class AbstractStorageService implements StorageService, Applicat
         }
     }
 
+    @Override
+    public void addLog(StorageEnum type, Map<String, Object> params) {
+        try {
+            insertLog(getCollectionId(type, null), params);
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    public void addData(StorageEnum type, String collectionId, List<Map> list) {
+        try {
+            insertData(getCollectionId(type, collectionId), list);
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+            throw new StorageException(e);
+        }
+    }
+
     private String getCollectionId(StorageEnum type, String collectionId) {
         Assert.notNull(type, "StorageEnum can not be null.");
         Strategy strategy = map.get(type.getType().concat("Strategy"));

+ 16 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageService.java

@@ -29,4 +29,20 @@ public interface StorageService {
 
     void remove(StorageEnum type, String id, String collectionId);
 
+    /**
+     * 记录日志
+     *
+     * @param log
+     * @param params
+     */
+    void addLog(StorageEnum log, Map<String,Object> params);
+
+    /**
+     * 记录数据
+     *
+     * @param data
+     * @param collectionId
+     * @param list
+     */
+    void addData(StorageEnum data, String collectionId, List<Map> list);
 }

+ 13 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -96,6 +96,19 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
         map.get(collectionId).delete(new Term(ConfigConstant.CONFIG_MODEL_ID, id));
     }
 
+    @Override
+    public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
+        createShardIfNotExist(collectionId);
+        // TODO 实现日志写入
+    }
+
+    @Override
+    public void insertData(String collectionId, List<Map> list) throws IOException {
+        createShardIfNotExist(collectionId);
+        // TODO 实现数据写入
+
+    }
+
     /**
      * 如果不存在分片则创建(线程安全)
      *<p>/data/config</p>

+ 10 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -37,4 +37,14 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void delete(String collectionId, String id) throws IOException {
 
     }
+
+    @Override
+    public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
+
+    }
+
+    @Override
+    public void insertData(String collectionId, List<Map> list) throws IOException {
+
+    }
 }