Преглед изворни кода

修复延迟同步未更新总数 & 优化binlog批量读取数限制

Signed-off-by: AE86 <836391306@qq.com>
AE86 пре 2 година
родитељ
комит
79996d3d99

+ 0 - 5
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -30,11 +30,6 @@ public interface Event {
      */
      */
     void forceFlushEvent(Map<String,String> snapshot);
     void forceFlushEvent(Map<String,String> snapshot);
 
 
-    /**
-     * 刷新事件变更时间
-     */
-    void refreshFlushEventUpdateTime();
-
     /**
     /**
      * 异常事件
      * 异常事件
      *
      *

+ 4 - 12
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -32,6 +32,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
+import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
 import java.util.*;
 import java.util.*;
@@ -178,12 +179,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         private static final int FLUSH_DELAYED_SECONDS = 30;
         private static final int FLUSH_DELAYED_SECONDS = 30;
         protected Mapping mapping;
         protected Mapping mapping;
         protected String metaId;
         protected String metaId;
-        private LocalDateTime updateTime = LocalDateTime.now();
 
 
         @Override
         @Override
         public void flushEvent(Map<String, String> snapshot) {
         public void flushEvent(Map<String, String> snapshot) {
             // 30s内更新,执行写入
             // 30s内更新,执行写入
-            if (updateTime.isAfter(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS))) {
+            Meta meta = manager.getMeta(metaId);
+            LocalDateTime lastSeconds = LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS);
+            if(meta.getUpdateTime() > Timestamp.valueOf(lastSeconds).getTime()){
                 if (!CollectionUtils.isEmpty(snapshot)) {
                 if (!CollectionUtils.isEmpty(snapshot)) {
                     logger.debug("{}", snapshot);
                     logger.debug("{}", snapshot);
                 }
                 }
@@ -200,11 +202,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             }
             }
         }
         }
 
 
-        @Override
-        public void refreshFlushEventUpdateTime() {
-            updateTime = LocalDateTime.now();
-        }
-
         @Override
         @Override
         public void errorEvent(Exception e) {
         public void errorEvent(Exception e) {
             logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
             logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
@@ -253,9 +250,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
 
             // 处理过程有异常向上抛
             // 处理过程有异常向上抛
             parser.execute(mapping, tableGroup, rowChangedEvent);
             parser.execute(mapping, tableGroup, rowChangedEvent);
-
-            // 标记有变更记录
-            refreshFlushEventUpdateTime();
         }
         }
     }
     }
 
 
@@ -314,8 +308,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                         parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                         parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                     }
                     }
                 });
                 });
-                // 标记有变更记录
-                refreshFlushEventUpdateTime();
                 eventCounter.set(0);
                 eventCounter.set(0);
                 return;
                 return;
             }
             }

+ 3 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -10,6 +10,8 @@ import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
+import java.time.Instant;
+
 /**
 /**
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
@@ -56,6 +58,7 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         Assert.notNull(meta, "Meta can not be null.");
         Assert.notNull(meta, "Meta can not be null.");
         meta.getFail().getAndAdd(writer.getFailData().size());
         meta.getFail().getAndAdd(writer.getFailData().size());
         meta.getSuccess().getAndAdd(writer.getSuccessData().size());
         meta.getSuccess().getAndAdd(writer.getSuccessData().size());
+        meta.setUpdateTime(Instant.now().toEpochMilli());
     }
     }
 
 
 }
 }

+ 7 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -154,7 +154,8 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
 
         @Override
         @Override
         public void run() {
         public void run() {
-            if (running || (binlogRecorderConfig.getBatchCount() * 2) + getQueue().size() >= getQueueCapacity()) {
+            // 读取任务数 >= 1/2缓存同步队列容量则继续等待
+            if (running || binlogRecorderConfig.getBatchCount() + getQueue().size() >= getQueueCapacity() / 2) {
                 return;
                 return;
             }
             }
 
 
@@ -205,13 +206,14 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             final List<Message> messages = new ArrayList<>(size);
             final List<Message> messages = new ArrayList<>(size);
             final List<Document> updateDocs = new ArrayList<>(size);
             final List<Document> updateDocs = new ArrayList<>(size);
             final Term[] deleteIds = new Term[size];
             final Term[] deleteIds = new Term[size];
+            boolean existProcessing = false;
             for (int i = 0; i < size; i++) {
             for (int i = 0; i < size; i++) {
                 Map row = list.get(i);
                 Map row = list.get(i);
                 String id = (String) row.get(BinlogConstant.BINLOG_ID);
                 String id = (String) row.get(BinlogConstant.BINLOG_ID);
                 Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
                 Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
                 BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
                 BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
                 if (BinlogConstant.PROCESSING == status) {
                 if (BinlogConstant.PROCESSING == status) {
-                    logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
+                    existProcessing = true;
                 }
                 }
                 deleteIds[i] = new Term(BinlogConstant.BINLOG_ID, id);
                 deleteIds[i] = new Term(BinlogConstant.BINLOG_ID, id);
                 String newId = String.valueOf(snowflakeIdWorker.nextId());
                 String newId = String.valueOf(snowflakeIdWorker.nextId());
@@ -225,6 +227,9 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
                     logger.error(e.getMessage());
                     logger.error(e.getMessage());
                 }
                 }
             }
             }
+            if (existProcessing) {
+                logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
+            }
 
 
             // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
             // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
             shard.insertBatch(updateDocs);
             shard.insertBatch(updateDocs);

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -27,7 +27,7 @@ dbsyncer.plugin.notify.mail.username=your mail username
 dbsyncer.plugin.notify.mail.password=your mail authorization code
 dbsyncer.plugin.notify.mail.password=your mail authorization code
 
 
 #storage
 #storage
-dbsyncer.storage.binlog.recorder.batch-count=1000
+dbsyncer.storage.binlog.recorder.batch-count=3000
 dbsyncer.storage.binlog.recorder.max-processing-seconds=120
 dbsyncer.storage.binlog.recorder.max-processing-seconds=120
 dbsyncer.storage.binlog.recorder.queue-capacity=10000
 dbsyncer.storage.binlog.recorder.queue-capacity=10000
 dbsyncer.storage.binlog.recorder.writer-period-millisecond=500
 dbsyncer.storage.binlog.recorder.writer-period-millisecond=500