AE86 2 年之前
父节点
当前提交
f148aa2f26

+ 12 - 8
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -77,10 +77,11 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     /**
      * 是否跳过分区处理
      *
-     * @param request
+     * @param nextRequest
+     * @param response
      * @return
      */
-    protected boolean skipPartition(Request request){
+    protected boolean skipPartition(Request nextRequest, Response response){
         return false;
     }
 
@@ -126,17 +127,19 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     private void flush(Queue<Request> queue) throws IllegalAccessException, InstantiationException {
         if (!queue.isEmpty()) {
             AtomicLong batchCounter = new AtomicLong();
-            final Map<String, BufferResponse> map = new LinkedHashMap<>();
+            final Map<String, Response> map = new LinkedHashMap<>();
             while (!queue.isEmpty() && batchCounter.get() < bufferActuatorConfig.getBatchCount()) {
                 Request poll = queue.poll();
                 String key = getPartitionKey(poll);
                 if (!map.containsKey(key)) {
-                    map.putIfAbsent(key, (BufferResponse) responseClazz.newInstance());
+                    map.putIfAbsent(key, responseClazz.newInstance());
                 }
-                partition(poll, (Response) map.get(key));
+                Response response = map.get(key);
+                partition(poll, response);
                 batchCounter.incrementAndGet();
 
-                if(skipPartition(poll)){
+                Request next = queue.peek();
+                if(null != next && skipPartition(next, response)){
                     break;
                 }
             }
@@ -144,11 +147,12 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
             map.forEach((key, flushTask) -> {
                 long now = Instant.now().toEpochMilli();
                 try {
-                    pull((Response) flushTask);
+                    pull(flushTask);
                 } catch (Exception e) {
                     logger.error("[{}]异常{}", key);
                 }
-                logger.info("[{}]{}条,耗时{}毫秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now));
+                final BufferResponse task = (BufferResponse) flushTask;
+                logger.info("[{}{}]{}条,耗时{}毫秒", key, task.getSuffixName(), task.getTaskSize(), (Instant.now().toEpochMilli() - now));
             });
             map.clear();
         }

+ 6 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferResponse.java

@@ -14,4 +14,10 @@ public interface BufferResponse {
      */
     int getTaskSize();
 
+    /**
+     * 获取后缀名称
+     *
+     * @return
+     */
+    String getSuffixName();
 }

+ 4 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -45,7 +45,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
 
     @Override
     protected String getPartitionKey(WriterRequest request) {
-        return new StringBuilder(request.getTableGroupId()).append("-").append(request.getEvent()).toString();
+        return request.getTableGroupId();
     }
 
     @Override
@@ -83,9 +83,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     }
 
     @Override
-    protected boolean skipPartition(WriterRequest request) {
-        // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,出现Delete事件跳过分区处理
-        return ConnectorConstant.OPERTION_DELETE.equals(request.getEvent());
+    protected boolean skipPartition(WriterRequest nextRequest, WriterResponse response) {
+        // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,批处理任务中出现不同事件时,跳过分区处理
+        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent());
     }
 
     /**

+ 6 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/StorageResponse.java

@@ -13,6 +13,7 @@ import java.util.Map;
  */
 public class StorageResponse implements BufferResponse {
 
+    private static final String EMPTY = "";
     private String metaId;
     private List<Map> dataList = new LinkedList<>();
 
@@ -36,4 +37,9 @@ public class StorageResponse implements BufferResponse {
     public int getTaskSize() {
         return dataList.size();
     }
+
+    @Override
+    public String getSuffixName() {
+        return EMPTY;
+    }
 }

+ 6 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -13,6 +13,7 @@ import java.util.Map;
  */
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 
+    private static final String SYMBOL = "-";
     private List<Map> dataList = new LinkedList<>();
     private List<String> messageIds = new LinkedList<>();
 
@@ -23,6 +24,11 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return dataList.size();
     }
 
+    @Override
+    public String getSuffixName() {
+        return SYMBOL.concat(getEvent());
+    }
+
     public List<Map> getDataList() {
         return dataList;
     }