瀏覽代碼

update flush

AE86 5 年之前
父節點
當前提交
b32ff47d5f

+ 1 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -7,9 +7,7 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.config.Host;
-import org.dbsyncer.listener.mysql.binlog.BinlogEventListener;
-import org.dbsyncer.listener.mysql.binlog.BinlogEventV4;
-import org.dbsyncer.listener.mysql.binlog.BinlogRemoteClient;
+import org.dbsyncer.listener.mysql.binlog.*;
 import org.dbsyncer.listener.mysql.binlog.impl.event.*;
 import org.dbsyncer.listener.mysql.common.glossary.Column;
 import org.dbsyncer.listener.mysql.common.glossary.Pair;

+ 11 - 22
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.manager.puller.impl;
 
 import org.dbsyncer.common.event.Event;
-import org.dbsyncer.common.event.IncrementRefreshEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.DefaultExtractor;
@@ -14,7 +13,6 @@ import org.dbsyncer.parser.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -32,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ApplicationListener<IncrementRefreshEvent> {
+public class IncrementPuller extends AbstractPuller {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -89,29 +87,11 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
         }
     }
 
-    @Override
-    public void onApplicationEvent(IncrementRefreshEvent event) {
-        flush(event.getMetaId());
-    }
-
     private void finished(String metaId) {
         map.remove(metaId);
         publishClosedEvent(metaId);
     }
 
-    private void flush(String metaId){
-        // TODO 更新待优化,存在性能问题
-        logger.info("flushEvent");
-        DefaultExtractor extractor = map.get(metaId);
-        if (null != extractor) {
-            Meta meta = manager.getMeta(metaId);
-            if (null != meta) {
-                meta.setMap(extractor.getMap());
-                manager.editMeta(meta);
-            }
-        }
-    }
-
     final class DefaultListener implements Event {
 
         private Mapping mapping;
@@ -149,7 +129,16 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
         @Override
         public void flushEvent() {
-            flush(metaId);
+            // TODO 更新待优化,存在性能问题
+            DefaultExtractor extractor = map.get(metaId);
+            if (null != extractor) {
+                logger.info("flushEvent map:{}", extractor.getMap());
+                Meta meta = manager.getMeta(metaId);
+                if (null != meta) {
+                    meta.setMap(extractor.getMap());
+                    manager.editMeta(meta);
+                }
+            }
         }
 
     }

+ 0 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -2,7 +2,6 @@ package org.dbsyncer.parser;
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.FullRefreshEvent;
-import org.dbsyncer.common.event.IncrementRefreshEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -257,7 +256,6 @@ public class ParserFactory implements Parser {
 
         // 5、更新结果
         flush(metaId, writer, 1);
-        applicationContext.publishEvent(new IncrementRefreshEvent(applicationContext, metaId));
     }
 
     /**