AE86 пре 2 година
родитељ
комит
24a293b21e

+ 5 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -30,6 +30,11 @@ public interface Event {
      */
     void forceFlushEvent(Map<String,String> map);
 
+    /**
+     * 刷新事件变更时间
+     */
+    void refreshFlushEventUpdateTime();
+
     /**
      * 异常事件
      *

+ 13 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -37,10 +37,10 @@ import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
 import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -176,16 +176,17 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     }
 
     abstract class AbstractListener implements Event {
+        private static final int FLUSH_DELAYED_SECONDS = 30;
         protected Mapping mapping;
         protected String metaId;
-        protected AtomicBoolean changed = new AtomicBoolean();
+        private LocalDateTime updateTime = LocalDateTime.now();
 
         @Override
         public void flushEvent(Map<String, String> map) {
-            // 如果有变更,执行更新
-            if (changed.compareAndSet(true, false)) {
+            // 30s内更新,执行写入
+            if (updateTime.isAfter(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS))) {
                 if (!CollectionUtils.isEmpty(map)) {
-                    logger.info("{}", map);
+                    logger.debug("{}", map);
                 }
                 forceFlushEvent(map);
             }
@@ -200,6 +201,11 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             }
         }
 
+        @Override
+        public void refreshFlushEventUpdateTime() {
+            updateTime = LocalDateTime.now();
+        }
+
         @Override
         public void errorEvent(Exception e) {
             logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
@@ -251,7 +257,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             parser.execute(mapping, tableGroup, rowChangedEvent);
 
             // 标记有变更记录
-            changed.compareAndSet(false, true);
+            refreshFlushEventUpdateTime();
         }
     }
 
@@ -314,7 +320,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                     }
                 });
                 // 标记有变更记录
-                changed.compareAndSet(false, true);
+                refreshFlushEventUpdateTime();
                 eventCounter.set(0);
                 return;
             }

+ 2 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -47,16 +47,11 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     }
 
     protected void refreshTotal(String metaId, Result writer) {
-        Meta meta = getMeta(metaId);
-        meta.getFail().getAndAdd(writer.getFailData().size());
-        meta.getSuccess().getAndAdd(writer.getSuccessData().size());
-    }
-
-    protected Meta getMeta(String metaId) {
         Assert.hasText(metaId, "Meta id can not be empty.");
         Meta meta = cacheService.get(metaId, Meta.class);
         Assert.notNull(meta, "Meta can not be null.");
-        return meta;
+        meta.getFail().getAndAdd(writer.getFailData().size());
+        meta.getSuccess().getAndAdd(writer.getSuccessData().size());
     }
 
 }