Ver código fonte

新增执行器路由

AE86 1 ano atrás
pai
commit
e9b1a65737

+ 0 - 13
dbsyncer-common/src/main/java/org/dbsyncer/common/config/FlushExecutorConfig.java

@@ -1,13 +0,0 @@
-package org.dbsyncer.common.config;
-
-/**
- * 持久化线程池配置
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020-04-26 23:40
- */
-@Deprecated
-public class FlushExecutorConfig {
-
-}

+ 0 - 11
dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java

@@ -1,11 +0,0 @@
-package org.dbsyncer.common.config;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/9/21 22:20
- */
-@Deprecated
-public class IncrementDataConfig {
-
-}

+ 0 - 11
dbsyncer-common/src/main/java/org/dbsyncer/common/config/WriteExecutorConfig.java

@@ -1,11 +0,0 @@
-package org.dbsyncer.common.config;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020-04-26 23:40
- */
-@Deprecated
-public class WriteExecutorConfig {
-
-}

+ 0 - 6
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -38,10 +38,4 @@ public interface Watcher {
      * @return
      */
     long getMetaUpdateTime();
-
-    /**
-     * 关闭事件
-     *
-     */
-    void closeEvent();
 }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseTemplate.java

@@ -596,7 +596,7 @@ public class DatabaseTemplate implements JdbcOperations {
         } catch (SQLException ex) {
             // Release Connection early, to avoid potential connection pool deadlock
             // in the case when the exception translator hasn't been initialized yet.
-            if (psc instanceof ParameterDisposer) {
+             if (psc instanceof ParameterDisposer) {
                 ((ParameterDisposer) psc).cleanupParameters();
             }
             String sql = getSql(psc);

+ 0 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -44,11 +44,6 @@ public abstract class AbstractExtractor implements Extractor {
         this.watcher = watcher;
     }
 
-    @Override
-    public void closeEvent() {
-        watcher.closeEvent();
-    }
-
     @Override
     public void changeEvent(ChangedEvent event) {
         if (null != event) {

+ 0 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -23,11 +23,6 @@ public interface Extractor {
      */
     void register(Watcher watcher);
 
-    /**
-     * 关闭事件
-     */
-    void closeEvent();
-
     /**
      * 数据变更事件
      *

+ 90 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/BufferActuatorRouter.java

@@ -0,0 +1,90 @@
+package org.dbsyncer.manager.puller;
+
+import org.dbsyncer.common.config.TableGroupBufferConfig;
+import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.flush.impl.TableGroupBufferActuator;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Component
+public final class BufferActuatorRouter implements DisposableBean {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Resource
+    private TableGroupBufferConfig tableGroupBufferConfig;
+
+    @Resource
+    private TableGroupBufferActuator tableGroupBufferActuator;
+
+    @Resource
+    private Parser parser;
+
+    /**
+     * 驱动缓存执行路由列表
+     */
+    private Map<String, Map<String, TableGroupBufferActuator>> router = new ConcurrentHashMap<>();
+
+    public void execute(String metaId, String tableGroupId, ChangedEvent event) {
+        if (router.containsKey(metaId) && router.get(metaId).containsKey(tableGroupId)) {
+            router.get(metaId).get(tableGroupId).offer(new WriterRequest(tableGroupId, event));
+            return;
+        }
+        parser.execute(tableGroupId, event);
+    }
+
+    public void bind(String metaId, String tableGroupId) {
+        router.computeIfAbsent(metaId, k -> new ConcurrentHashMap<>());
+
+        // TODO 暂定执行器上限,待替换为LRU模型
+        if (router.get(metaId).size() >= tableGroupBufferConfig.getMaxBufferActuatorSize()) {
+            return;
+        }
+
+        router.get(metaId).computeIfAbsent(tableGroupId, k -> {
+            TableGroupBufferActuator newBufferActuator = null;
+            try {
+                newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
+                newBufferActuator.setTableGroupId(tableGroupId);
+                newBufferActuator.buildConfig();
+            } catch (CloneNotSupportedException ex) {
+                logger.error(ex.getMessage(), ex);
+            }
+            return newBufferActuator;
+        });
+    }
+
+    public void unbind(String metaId) {
+        if (router.containsKey(metaId)) {
+            router.get(metaId).values().forEach(TableGroupBufferActuator::stop);
+            router.remove(metaId);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        router.values().forEach(map -> map.values().forEach(TableGroupBufferActuator::stop));
+        router.clear();
+    }
+
+    public AtomicLong getQueueSize() {
+        AtomicLong total = new AtomicLong();
+        router.values().forEach(map -> map.values().forEach(actuator -> total.addAndGet(actuator.getQueue().size())));
+        return total;
+    }
+
+    public AtomicLong getQueueCapacity() {
+        AtomicLong total = new AtomicLong();
+        router.values().forEach(map -> map.values().forEach(actuator -> total.addAndGet(actuator.getQueueCapacity())));
+        return total;
+    }
+}

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -36,7 +36,7 @@ import java.util.concurrent.Executors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class FullPuller extends AbstractPuller implements ApplicationListener<FullRefreshEvent> {
+public final class FullPuller extends AbstractPuller implements ApplicationListener<FullRefreshEvent> {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 

+ 9 - 44
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.manager.puller;
 
-import org.dbsyncer.common.config.TableGroupBufferConfig;
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
@@ -23,15 +22,12 @@ import org.dbsyncer.listener.quartz.TableGroupQuartzCommand;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.model.FieldPicker;
-import org.dbsyncer.parser.Parser;
-import org.dbsyncer.parser.flush.impl.TableGroupBufferActuator;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,16 +56,10 @@ import java.util.stream.Collectors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ApplicationListener<RefreshOffsetEvent>, ScheduledTaskJob {
+public final class IncrementPuller extends AbstractPuller implements ApplicationListener<RefreshOffsetEvent>, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Resource
-    private TableGroupBufferConfig tableGroupBufferConfig;
-
-    @Resource
-    private Parser parser;
-
     @Resource
     private Listener listener;
 
@@ -86,7 +76,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     private ConnectorFactory connectorFactory;
 
     @Resource
-    private TableGroupBufferActuator tableGroupBufferActuator;
+    private BufferActuatorRouter bufferActuatorRouter;
 
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
@@ -130,7 +120,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     public void close(String metaId) {
         Extractor extractor = map.get(metaId);
         if (null != extractor) {
-            extractor.closeEvent();
+            bufferActuatorRouter.unbind(metaId);
             extractor.close();
         }
         map.remove(metaId);
@@ -204,8 +194,6 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
     abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
         protected Meta meta;
-        private Map<String, TableGroupBufferActuator> router = new ConcurrentHashMap<>();
-        private final int MAX_BUFFER_ACTUATOR_SIZE = 10;
 
         public abstract void onChange(E e);
 
@@ -231,35 +219,12 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
             return meta.getUpdateTime();
         }
 
-        @Override
-        public void closeEvent() {
-            router.values().forEach(bufferActuator -> bufferActuator.stop());
+        protected void bind(String tableGroupId){
+            bufferActuatorRouter.bind(meta.getId(), tableGroupId);
         }
 
-        protected void execute(String tableGroupId, ChangedEvent event) {
-            if (router.containsKey(tableGroupId)) {
-                router.get(tableGroupId).offer(new WriterRequest(tableGroupId, event));
-                return;
-            }
-            parser.execute(tableGroupId, event);
-        }
-
-        protected void buildBufferActuator(String tableGroupId) {
-            // TODO 暂定执行器上限,待替换为LRU模型
-            if (router.size() >= tableGroupBufferConfig.getMaxBufferActuatorSize()) {
-                return;
-            }
-            router.computeIfAbsent(tableGroupId, k -> {
-                TableGroupBufferActuator newBufferActuator = null;
-                try {
-                    newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
-                    newBufferActuator.setTableGroupId(tableGroupId);
-                    newBufferActuator.buildConfig();
-                } catch (CloneNotSupportedException ex) {
-                    logger.error(ex.getMessage(), ex);
-                }
-                return newBufferActuator;
-            });
+        protected void execute(String tableGroupId, E event){
+            bufferActuatorRouter.execute(meta.getId(), tableGroupId, event);
         }
     }
 
@@ -270,7 +235,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
             this.meta = meta;
             tableGroups.forEach(t -> {
                 tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t)));
-                buildBufferActuator(t.getId());
+                bind(t.getId());
             });
         }
 
@@ -296,7 +261,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
                 TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
                 tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
-                buildBufferActuator(group.getId());
+                bind(group.getId());
             });
         }
 

+ 8 - 2
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -8,6 +8,7 @@ import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.puller.BufferActuatorRouter;
 import org.dbsyncer.monitor.enums.MetricEnum;
 import org.dbsyncer.monitor.enums.StatisticEnum;
 import org.dbsyncer.monitor.enums.TaskMetricEnum;
@@ -65,6 +66,9 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
     @Resource
     private BufferActuator storageBufferActuator;
 
+    @Resource
+    private BufferActuatorRouter bufferActuatorRouter;
+
     @Resource
     private ScheduledTaskService scheduledTaskService;
 
@@ -196,8 +200,10 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         report.setInsert(mappingReportMetric.getInsert());
         report.setUpdate(mappingReportMetric.getUpdate());
         report.setDelete(mappingReportMetric.getDelete());
-        report.setQueueUp(generalBufferActuator.getQueue().size());
-        report.setQueueCapacity(generalBufferActuator.getQueueCapacity());
+        // 堆积数据(通用执行器 + 表执行器)
+        report.setQueueUp(bufferActuatorRouter.getQueueSize().addAndGet(generalBufferActuator.getQueue().size()));
+        // 容量(通用执行器 + 表执行器)
+        report.setQueueCapacity( bufferActuatorRouter.getQueueCapacity().addAndGet(generalBufferActuator.getQueueCapacity()));
         return report;
     }
 

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -80,7 +80,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
     protected void buildQueueConfig() {
         this.queue = new LinkedBlockingQueue(config.getBufferQueueCapacity());
-        logger.info("初始化{}容量:{}", this.getClass().getSimpleName(), config.getBufferQueueCapacity());
+        logger.info("{} initialized with queue capacity: {}", this.getClass().getSimpleName(), config.getBufferQueueCapacity());
     }
 
     /**

+ 3 - 3
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -65,7 +65,7 @@ public class PluginFactory implements DisposableBean {
                 String pluginId = createPluginId(s.getClass().getName(), s.getVersion());
                 service.putIfAbsent(pluginId, s);
                 plugins.add(new Plugin(s.getName(), s.getClass().getName(), s.getVersion(), "", true));
-                logger.info("初始化插件, {}_{} {}", s.getName(), s.getVersion(), s.getClass().getName());
+                logger.info("{}_{} {}", s.getName(), s.getVersion(), s.getClass().getName());
                 try {
                     s.init();
                 } catch (Exception e) {
@@ -167,7 +167,7 @@ public class PluginFactory implements DisposableBean {
                 }
                 service.putIfAbsent(pluginId, s);
                 plugins.add(new Plugin(s.getName(), s.getClass().getName(), s.getVersion(), fileName, false));
-                logger.info("初始化插件 {}, {}_{} {}", fileName, s.getName(), s.getVersion(), s.getClass().getName());
+                logger.info("{}, {}_{} {}", fileName, s.getName(), s.getVersion(), s.getClass().getName());
                 try {
                     s.init();
                 } catch (Exception e) {
@@ -183,7 +183,7 @@ public class PluginFactory implements DisposableBean {
     @Override
     public void destroy() {
         service.values().forEach(s -> {
-            logger.info("关闭插件, {}_{} {}", s.getName(), s.getVersion(), s.getClass().getName());
+            logger.info("{}_{} {}", s.getName(), s.getVersion(), s.getClass().getName());
             try {
                 s.close();
             } catch (Exception e) {