Explorar o código

同步写日志

AE86 %!s(int64=3) %!d(string=hai) anos
pai
achega
7f78489a78

+ 1 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -85,16 +85,14 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     @Override
     public void offer(BufferRequest request) {
-        int size = 0;
         if (running) {
             temp.offer((Request) request);
-            size = temp.size();
         } else {
             buffer.offer((Request) request);
-            size = buffer.size();
         }
 
         // TODO 临时解决方案:生产大于消费问题,限制生产速度
+        int size = temp.size() + buffer.size();
         if (size >= CAPACITY) {
             try {
                 TimeUnit.SECONDS.sleep(30);

+ 6 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -3,6 +3,7 @@ package org.dbsyncer.parser.flush;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.model.Meta;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
@@ -14,6 +15,8 @@ import org.springframework.util.Assert;
  */
 public abstract class AbstractFlushStrategy implements FlushStrategy {
 
+    private static final int MAX_ERROR_LENGTH = 1000;
+
     @Autowired
     private FlushService flushService;
 
@@ -34,10 +37,11 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         refreshTotal(metaId, result);
 
         if (!CollectionUtils.isEmpty(result.getFailData())) {
-            flushService.asyncWrite(metaId, event, false, result.getFailData(), result.getError().toString());
+            final String error = StringUtil.substring(result.getError().toString(), 0, MAX_ERROR_LENGTH);
+            flushService.write(metaId, event, false, result.getFailData(), error);
         }
         if (!CollectionUtils.isEmpty(result.getSuccessData())) {
-            flushService.asyncWrite(metaId, event, true, result.getSuccessData(), "");
+            flushService.write(metaId, event, true, result.getSuccessData(), "");
         }
     }
 

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -24,6 +24,5 @@ public interface FlushService {
      * @param success
      * @param data
      */
-    @Async("taskExecutor")
-    void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error);
+    void write(String metaId, String event, boolean success, List<Map> data, String error);
 }

+ 12 - 18
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -2,7 +2,6 @@ package org.dbsyncer.parser.flush.impl;
 
 import com.alibaba.fastjson.JSONException;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.flush.model.StorageRequest;
@@ -20,7 +19,6 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * 持久化
@@ -36,8 +34,6 @@ public class FlushServiceImpl implements FlushService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final int MAX_ERROR_LENGTH = 1000;
-
     @Autowired
     private StorageService storageService;
 
@@ -58,25 +54,23 @@ public class FlushServiceImpl implements FlushService {
     }
 
     @Override
-    public void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error) {
+    public void write(String metaId, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
-        List<Map> list = data.stream().map(r -> {
-            Map<String, Object> params = new HashMap();
-            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-            params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
-            params.put(ConfigConstant.DATA_EVENT, event);
-            params.put(ConfigConstant.DATA_ERROR, StringUtil.substring(error, 0, MAX_ERROR_LENGTH));
+        data.forEach(r -> {
+            Map<String, Object> row = new HashMap();
+            row.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+            row.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
+            row.put(ConfigConstant.DATA_EVENT, event);
+            row.put(ConfigConstant.DATA_ERROR, error);
             try {
-                params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
+                row.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
             } catch (JSONException e) {
                 logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
-                params.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
+                row.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
             }
-            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            return params;
-        }).collect(Collectors.toList());
-
-        storageBufferActuator.offer(new StorageRequest(metaId, list));
+            row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+            storageBufferActuator.offer(new StorageRequest(metaId, row));
+        });
     }
 
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -38,7 +38,7 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
     @Override
     protected void partition(StorageRequest request, StorageResponse response) {
         response.setMetaId(request.getMetaId());
-        response.getDataList().addAll(request.getList());
+        response.getDataList().add(request.getRow());
     }
 
     @Override

+ 5 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java

@@ -2,7 +2,6 @@ package org.dbsyncer.parser.flush.model;
 
 import org.dbsyncer.parser.flush.BufferRequest;
 
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -14,18 +13,18 @@ public class StorageRequest implements BufferRequest {
 
     private String metaId;
 
-    private List<Map> list;
+    private Map row;
 
-    public StorageRequest(String metaId, List<Map> list) {
+    public StorageRequest(String metaId, Map row) {
         this.metaId = metaId;
-        this.list = list;
+        this.row = row;
     }
 
     public String getMetaId() {
         return metaId;
     }
 
-    public List<Map> getList() {
-        return list;
+    public Map getRow() {
+        return row;
     }
 }