Bladeren bron

修复并发写乱序 https://gitee.com/ghi/dbsyncer/issues/I5KK5C

AE86 2 jaren geleden
bovenliggende
commit
6a6e2e9aa9

+ 14 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -74,6 +74,16 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
      */
     protected abstract void pull(Response response);
 
+    /**
+     * 是否跳过分区处理
+     *
+     * @param request
+     * @return
+     */
+    protected boolean skipPartition(Request request){
+        return false;
+    }
+
     @Override
     public Queue getQueue() {
         return buffer;
@@ -125,6 +135,10 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
                 }
                 partition(poll, (Response) map.get(key));
                 batchCounter.incrementAndGet();
+
+                if(skipPartition(poll)){
+                    break;
+                }
             }
 
             map.forEach((key, flushTask) -> {

+ 7 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -7,6 +7,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.*;
@@ -81,6 +82,12 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         parserStrategy.complete(response.getMessageIds());
     }
 
+    @Override
+    protected boolean skipPartition(WriterRequest request) {
+        // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,出现Delete事件跳过分区处理
+        return ConnectorConstant.OPERTION_DELETE.equals(request.getEvent());
+    }
+
     /**
      * 获取连接器配置
      *