AE86 преди 3 години
родител
ревизия
7b78462e0b

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskService.java

@@ -17,6 +17,8 @@ public interface ScheduledTaskService {
      */
     void start(String key, String cron, ScheduledTaskJob job);
 
+    void start(String key, long period, ScheduledTaskJob job);
+
     void stop(String key);
 
 }

+ 12 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskServiceImpl.java

@@ -41,6 +41,18 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
         map.putIfAbsent(key, taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger) ));
     }
 
+    @Override
+    public void start(String key, long period, ScheduledTaskJob job) {
+        //校验任务key是否已经启动
+        final ScheduledFuture scheduledFuture = map.get(key);
+        if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
+            logger.warn(">>>>>> 当前任务已经启动,无需重复启动!");
+            return;
+        }
+        //获取需要定时调度的接口
+        map.putIfAbsent(key, taskScheduler.scheduleAtFixedRate(job, period));
+    }
+
     @Override
     public void stop(String key) {
         ScheduledFuture job = map.get(key);

+ 1 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -55,7 +55,6 @@ public class SqlServerExtractor extends AbstractExtractor implements ScheduledTa
     private static Set<SqlServerChangeTable> changeTables;
     private Connection connection;
     private String taskKey;
-    private String cron = "*/1 * * * * ?";
     private Worker worker;
     private Lsn lastLsn;
 
@@ -86,7 +85,7 @@ public class SqlServerExtractor extends AbstractExtractor implements ScheduledTa
             worker.start();
 
             taskKey = UUIDUtil.getUUID();
-            scheduledTaskService.start(taskKey, cron, this);
+            scheduledTaskService.start(taskKey, 300, this);
             connected = true;
         } catch (Exception e) {
             close();