1
0
AE86 3 жил өмнө
parent
commit
d47686a31e

+ 22 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskServiceImpl.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.listener.quartz;
 
+import org.dbsyncer.listener.ListenerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -31,26 +32,12 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
 
     @Override
     public void start(String key, String cron, ScheduledTaskJob job) {
-        //校验任务key是否已经启动
-        final ScheduledFuture scheduledFuture = map.get(key);
-        if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
-            logger.warn(">>>>>> 当前任务已经启动,无需重复启动!");
-            return;
-        }
-        //获取需要定时调度的接口
-        map.putIfAbsent(key, taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger) ));
+        apply(key, () -> taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger)));
     }
 
     @Override
     public void start(String key, long period, ScheduledTaskJob job) {
-        //校验任务key是否已经启动
-        final ScheduledFuture scheduledFuture = map.get(key);
-        if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
-            logger.warn(">>>>>> 当前任务已经启动,无需重复启动!");
-            return;
-        }
-        //获取需要定时调度的接口
-        map.putIfAbsent(key, taskScheduler.scheduleAtFixedRate(job, period));
+        apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
     }
 
     @Override
@@ -63,4 +50,23 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
         }
     }
 
+    private void apply(String key, ScheduledFutureMapper scheduledFutureMapper) {
+        final ScheduledFuture scheduledFuture = map.get(key);
+        if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
+            String msg = String.format(">>>>>> 任务已启动 %s  >>>>>>", key);
+            logger.error(msg);
+            throw new ListenerException(msg);
+        }
+        map.putIfAbsent(key, scheduledFutureMapper.apply());
+    }
+
+    private interface ScheduledFutureMapper {
+        /**
+         * 返回定时任务
+         *
+         * @return
+         */
+        ScheduledFuture apply();
+    }
+
 }

+ 0 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -388,10 +388,6 @@ public class SqlServerExtractor extends AbstractExtractor {
             while (!isInterrupted() && connected) {
                 try {
                     connect();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage());
-                }
-                try {
                     Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
                     if (!stopLsn.isAvailable()) {
                         TimeUnit.MICROSECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);

+ 8 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -32,11 +32,11 @@ import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import javax.annotation.PostConstruct;
 import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob, InitializingBean, DisposableBean {
+public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob, DisposableBean {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -82,6 +82,12 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
+    @PostConstruct
+    private void init() {
+        key = UUIDUtil.getUUID();
+        scheduledTaskService.start(key, "*/10 * * * * ?", this);
+    }
+
     @Override
     public void asyncStart(Mapping mapping) {
         final String mappingId = mapping.getId();
@@ -130,12 +136,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         map.forEach((k, v) -> v.flushEvent());
     }
 
-    @Override
-    public void afterPropertiesSet() {
-        key = UUIDUtil.getUUID();
-        scheduledTaskService.start(key, "*/10 * * * * ?", this);
-    }
-
     @Override
     public void destroy() {
         scheduledTaskService.stop(key);

+ 1 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -175,5 +175,6 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
         for (Map.Entry<String, Shard> m : map.entrySet()) {
             m.getValue().close();
         }
+        map.clear();
     }
 }

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -358,7 +358,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         this.config = config;
     }
 
-    class FieldPair {
+    final class FieldPair {
         String columnName;
         String labelName;
 
@@ -373,7 +373,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
     }
 
-    class FieldBuilder {
+    final class FieldBuilder {
         Map<String, FieldPair> fieldPairMap = new ConcurrentHashMap<>();
         Map<String, Field> fieldMap = new ConcurrentHashMap<>();
         List<FieldPair> fieldPairs;
@@ -419,7 +419,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
     }
 
-    class Executor {
+    final class Executor {
         private String table;
         private String query;
         private String insert;