Prechádzať zdrojové kódy

防止log消息挤压

AE86 4 rokov pred
rodič
commit
5c1c14355c

+ 4 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -5,6 +5,8 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
@@ -17,6 +19,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
  */
 public abstract class AbstractExtractor implements Extractor {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
     protected ConnectorConfig connectorConfig;
     protected ListenerConfig listenerConfig;
     protected Map<String, String> map;
@@ -64,6 +67,7 @@ public abstract class AbstractExtractor implements Extractor {
     @Override
     public void forceFlushEvent(){
         if (!CollectionUtils.isEmpty(watcher)) {
+            logger.info("Force flush:{}", map);
             watcher.forEach(w -> w.forceFlushEvent(map));
         }
     }

+ 0 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -233,7 +233,6 @@ public class MysqlExtractor extends AbstractExtractor {
                 RotateEventData data = event.getData();
                 refresh(data.getBinlogFilename(), data.getBinlogPosition());
                 forceFlushEvent();
-                logger.info("Force flush:{}", map);
                 return;
             }
         }

+ 18 - 6
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -43,6 +43,7 @@ import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -165,7 +166,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             AbstractExtractor extractor = listener.getExtractor(connectorType, AbstractExtractor.class);
             PrimaryKeyMappingStrategy strategy = PrimaryKeyMappingEnum.getPrimaryKeyMappingStrategy(connectorType);
 
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list, strategy));
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list, strategy, extractor));
             return extractor;
         }
         return null;
@@ -282,13 +283,18 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
      */
     final class LogListener extends AbstractListener {
 
+        private Extractor extractor;
         private Map<String, List<FieldPicker>> tablePicker;
+        private AtomicInteger eventCounter;
+        private static final int MAX_LOG_CACHE_SIZE = 128;
 
-        public LogListener(Mapping mapping, List<TableGroup> list, PrimaryKeyMappingStrategy strategy) {
+        public LogListener(Mapping mapping, List<TableGroup> list, PrimaryKeyMappingStrategy strategy, Extractor extractor) {
             this.mapping = mapping;
             this.metaId = mapping.getMetaId();
-            this.tablePicker = new LinkedHashMap<>();
             this.strategy = strategy;
+            this.extractor = extractor;
+            this.tablePicker = new LinkedHashMap<>();
+            this.eventCounter = new AtomicInteger();
             list.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
@@ -301,9 +307,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
         @Override
         public void changedLogEvent(RowChangedEvent rowChangedEvent) {
-            logger.info("tableName:{}, event:{}, beforeData:{}, afterData:{}, rowId:{}", rowChangedEvent.getTableName(),
-                    rowChangedEvent.getEvent(),
-                    rowChangedEvent.getBeforeData(), rowChangedEvent.getAfterData(), rowChangedEvent.getRowId());
+            logger.info(rowChangedEvent.toString());
 
             // 处理过程有异常向上抛
             List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getTableName());
@@ -320,6 +324,14 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
                 });
                 // 标记有变更记录
                 changed.compareAndSet(false, true);
+                eventCounter.set(0);
+                return;
+            }
+
+            // 防止挤压无效的增量数据,刷新最新的有效记录点
+            if(eventCounter.incrementAndGet() >= MAX_LOG_CACHE_SIZE){
+                extractor.forceFlushEvent();
+                eventCounter.set(0);
             }
         }