AE86 5 年 前
コミット
77ff63f861

+ 3 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java

@@ -3,7 +3,7 @@ package org.dbsyncer.biz.checker.impl.connector;
 import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
-import org.dbsyncer.biz.util.CheckerTypeUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.manager.Manager;
@@ -56,7 +56,7 @@ public class ConnectorChecker extends AbstractChecker implements ApplicationCont
         setConfig(connector, connectorType);
 
         // 配置连接器配置
-        String type = CheckerTypeUtil.getCheckerType(connectorType);
+        String type = StringUtil.toLowerCaseFirstOne(connectorType).concat("ConfigChecker");
         ConnectorConfigChecker checker = map.get(type);
         Assert.notNull(checker, "Checker can not be null.");
         checker.modify(connector, params);
@@ -83,7 +83,7 @@ public class ConnectorChecker extends AbstractChecker implements ApplicationCont
 
         // 配置连接器配置
         ConnectorConfig config = connector.getConfig();
-        String type = CheckerTypeUtil.getCheckerType(config.getConnectorType());
+        String type = StringUtil.toLowerCaseFirstOne(config.getConnectorType()).concat("ConfigChecker");
         ConnectorConfigChecker checker = map.get(type);
         Assert.notNull(checker, "Checker can not be null.");
         checker.modify(connector, params);

+ 3 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/LogConfigChecker.java

@@ -2,7 +2,7 @@ package org.dbsyncer.biz.checker.impl.mapping;
 
 import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.biz.checker.MappingLogConfigChecker;
-import org.dbsyncer.biz.util.CheckerTypeUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
@@ -43,9 +43,9 @@ public class LogConfigChecker implements MappingConfigChecker, ApplicationContex
         String connectorId = mapping.getSourceConnectorId();
         Connector connector = manager.getConnector(connectorId);
         ConnectorConfig config = connector.getConfig();
-        String type = CheckerTypeUtil.getCheckerType(config.getConnectorType() + "Log");
+        String type = StringUtil.toLowerCaseFirstOne(config.getConnectorType()).concat("LogConfigChecker");
         MappingLogConfigChecker checker = map.get(type);
-        if(null != checker){
+        if (null != checker) {
             checker.modify(mapping, params);
         }
         ListenerConfig listener = mapping.getListener();

+ 2 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -5,12 +5,11 @@ import org.apache.commons.lang.math.NumberUtils;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker;
-import org.dbsyncer.biz.util.CheckerTypeUtil;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Mapping;
@@ -111,7 +110,7 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         // 增量配置(日志/定时)
         String incrementStrategy = params.get("incrementStrategy");
         Assert.hasText(incrementStrategy, "MappingChecker check params incrementStrategy is empty");
-        String type = CheckerTypeUtil.getCheckerType(incrementStrategy);
+        String type = StringUtil.toLowerCaseFirstOne(incrementStrategy).concat("ConfigChecker");
         MappingConfigChecker checker = map.get(type);
         Assert.notNull(checker, "Checker can not be null.");
         checker.modify(mapping, params);

+ 0 - 30
dbsyncer-biz/src/main/java/org/dbsyncer/biz/util/CheckerTypeUtil.java

@@ -1,30 +0,0 @@
-package org.dbsyncer.biz.util;
-
-import org.apache.commons.lang.StringUtils;
-
-public abstract class CheckerTypeUtil {
-
-    /**
-     * 获取检查器类型
-     *
-     * @param type
-     * @return
-     */
-    public static String getCheckerType(String type) {
-        return toLowerCaseFirstOne(type).concat("ConfigChecker");
-    }
-
-    /**
-     * 首字母转小写
-     *
-     * @param s
-     * @return
-     */
-    private static String toLowerCaseFirstOne(String s) {
-        if (StringUtils.isBlank(s) || Character.isLowerCase(s.charAt(0))) {
-            return s;
-        }
-        return new StringBuilder().append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();
-    }
-
-}

+ 25 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -0,0 +1,25 @@
+package org.dbsyncer.common.util;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+public abstract class StringUtil {
+    public StringUtil() {
+    }
+
+    /**
+     * 首字母转小写
+     *
+     * @param s
+     * @return
+     */
+    public static String toLowerCaseFirstOne(String s) {
+        if (StringUtils.isBlank(s) || Character.isLowerCase(s.charAt(0))) {
+            return s;
+        }
+        return new StringBuilder().append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();
+    }
+
+}

+ 6 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/Increment.java

@@ -0,0 +1,6 @@
+package org.dbsyncer.manager.extractor;
+
+public interface Increment {
+
+    void close(String metaId);
+}

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/FullExtractor.java

@@ -37,7 +37,7 @@ public class FullExtractor extends AbstractExtractor implements ApplicationListe
     @Autowired
     private Manager manager;
 
-    protected Map<String, Task> map = new ConcurrentHashMap<>();
+    private Map<String, Task> map = new ConcurrentHashMap<>();
 
     @Override
     public void asyncStart(Mapping mapping) {
@@ -47,7 +47,7 @@ public class FullExtractor extends AbstractExtractor implements ApplicationListe
 
         try {
             List<TableGroup> list = manager.getTableGroupAll(mappingId);
-            Assert.notEmpty(list, "映射关系为空");
+            Assert.notEmpty(list, "映射关系不能为空");
 
             // 执行任务
             logger.info("启动任务:{}", metaId);

+ 61 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/IncrementExtractor.java

@@ -1,10 +1,27 @@
 package org.dbsyncer.manager.extractor.impl;
 
+import org.dbsyncer.common.task.Task;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.listener.Listener;
+import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
+import org.dbsyncer.manager.extractor.Increment;
+import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.TableGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * 增量同步
@@ -14,18 +31,60 @@ import org.springframework.stereotype.Component;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementExtractor extends AbstractExtractor {
+public class IncrementExtractor extends AbstractExtractor implements ApplicationContextAware {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Autowired
     private Listener listener;
 
+    @Autowired
+    private Manager manager;
+
+    private Map<String, Increment> pull;
+
+    private Map<String, Task> map = new ConcurrentHashMap<>();
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        pull = applicationContext.getBeansOfType(Increment.class);
+    }
+
     @Override
     public void asyncStart(Mapping mapping) {
+        final String metaId = mapping.getMetaId();
+        map.putIfAbsent(metaId, new Task(metaId));
+
+        try {
+            ListenerConfig listener = mapping.getListener();
+            // log / timing
+            String type = listener.getListenerType() + "Increment";
+            Increment increment = this.pull.get(type);
+            Assert.notNull(increment, "未知的同步方式.");
+
+            Connector connector = manager.getConnector(mapping.getSourceConnectorId());
+            Assert.notNull(connector, "连接器不能为空.");
 
+            // 执行任务
+            logger.info("启动任务:{}", metaId);
+            Task task = map.get(metaId);
+
+        } catch (Exception e) {
+            // TODO 记录错误日志
+            logger.error(e.getMessage());
+        } finally {
+            map.remove(metaId);
+            publishClosedEvent(metaId);
+            logger.info("结束任务:{}", metaId);
+        }
     }
 
     @Override
     public void close(String metaId) {
-
+        Task task = map.get(metaId);
+        if (null != task) {
+            task.stop();
+        }
     }
+
 }

+ 20 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/LogIncrement.java

@@ -0,0 +1,20 @@
+package org.dbsyncer.manager.extractor.increment;
+
+import org.dbsyncer.manager.extractor.Increment;
+import org.springframework.stereotype.Component;
+
+/**
+ * 监听日志同步
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/05/05 15:28
+ */
+@Component
+public class LogIncrement implements Increment {
+
+    @Override
+    public void close(String metaId) {
+
+    }
+}

+ 20 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/TimingIncrement.java

@@ -0,0 +1,20 @@
+package org.dbsyncer.manager.extractor.increment;
+
+import org.dbsyncer.manager.extractor.Increment;
+import org.springframework.stereotype.Component;
+
+/**
+ * 定时同步
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/05/05 15:28
+ */
+@Component
+public class TimingIncrement implements Increment {
+
+    @Override
+    public void close(String metaId) {
+
+    }
+}

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -353,7 +353,7 @@ public class ParserFactory implements Parser {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(threadSize);
         executor.setMaxPoolSize(threadSize);
-        executor.setQueueCapacity(threadSize * 2);
+        executor.setQueueCapacity(0);
         executor.setKeepAliveSeconds(30);
         executor.setAwaitTerminationSeconds(30);
         executor.setThreadNamePrefix("ParserExecutor");

+ 6 - 6
dbsyncer-web/src/main/resources/templates/index/index.html

@@ -158,13 +158,13 @@
                     <td th:text="${m?.mappingName}"></td>
                     <td th:text="${m?.model}"></td>
                     <td>
-                        <span th:if="${m?.model eq '全量' and m?.total gt 0 and (m?.success + m?.fail) gt 0}">
-                            总数:[[${m?.total}]],
-                            进度:[[${#numbers.formatDecimal(((m?.success + m?.fail) / m?.total * 100.00),0 ,2)}]]%,
-                            耗时:[[${(m?.endTime - m?.beginTime) / 1000}]]秒,
+                        总数:[[${m?.total}]]
+                        <span th:if="${m?.model eq '全量' and (m?.success + m?.fail) gt 0}">
+                            ,进度:[[${#numbers.formatDecimal(((m?.success + m?.fail) / m?.total * 100.00),0 ,2)}]]%
+                            ,耗时:[[${(m?.endTime - m?.beginTime) / 1000}]]秒
                         </span>
-                        <span th:if="${m?.success gt 0}">成功:[[${m?.success}]],</span>
-                        <span th:if="${m?.fail gt 0}">失败:[[${m?.fail}]] <a href="javascript:;" class="label label-danger">日志</a></span>
+                        <span th:if="${m?.success gt 0}">,成功:[[${m?.success}]]</span>
+                        <span th:if="${m?.fail gt 0}">,失败:[[${m?.fail}]] <a href="javascript:;" class="label label-danger">日志</a></span>
                     </td>
                     <td>
                         <span th:if="${m?.state eq 0}" class="label label-info">未运行</span>

+ 2 - 2
dbsyncer-web/src/main/resources/templates/mapping/editIncrement.html

@@ -10,10 +10,10 @@
             <div class="col-md-4">
                 <div class="row text-center" id="mappingIncrementStrategyConfig">
                     <div class="col-sm-6">
-                        <input type="radio" name="incrementStrategy" value="log" th:checked="${'log' eq mapping?.listener?.listenerType}" /> 日志
+                        <input type="radio" name="incrementStrategy" value="timing" th:checked="${'timing' eq mapping?.listener?.listenerType}" /> 定时
                     </div>
                     <div class="col-sm-6">
-                        <input type="radio" name="incrementStrategy" value="timing" th:checked="${'timing' eq mapping?.listener?.listenerType}" /> 定时
+                        <input type="radio" name="incrementStrategy" value="log" th:checked="${'log' eq mapping?.listener?.listenerType}" /> 日志
                     </div>
                 </div>
             </div>