Selaa lähdekoodia

优化定时任务,同步运行状态

AE86 3 vuotta sitten
vanhempi
säilyke
ab0295ccba

+ 15 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -17,7 +17,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -42,7 +43,8 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
     private Set<String> delete;
     private String taskKey;
     private long period;
-    private AtomicBoolean running;
+    private volatile boolean running;
+    private final Lock lock = new ReentrantLock(true);
 
     /**
      * 获取增量参数
@@ -65,7 +67,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
         taskKey = UUIDUtil.getUUID();
         period = listenerConfig.getPeriod();
-        running = new AtomicBoolean();
+        running = true;
         run();
         scheduledTaskService.start(taskKey, period * 1000, this);
         logger.info("启动定时任务:{} >> {}秒", taskKey, period);
@@ -73,24 +75,29 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
     @Override
     public void run() {
+        final Lock taskLock = lock;
+        boolean locked = false;
         try {
-            if (running.compareAndSet(false, true)) {
-                // 依次执行同步映射关系
+            locked = taskLock.tryLock();
+            if (locked) {
                 for (int i = 0; i < commandSize; i++) {
                     execute(commands.get(i), i);
                 }
-                running.compareAndSet(true, false);
             }
         } catch (Exception e) {
-            running.compareAndSet(true, false);
             errorEvent(e);
             logger.error(e.getMessage());
+        } finally {
+            if (locked) {
+                taskLock.unlock();
+            }
         }
     }
 
     @Override
     public void close() {
         scheduledTaskService.stop(taskKey);
+        running = false;
     }
 
     private void execute(Map<String, String> command, int index) {
@@ -98,7 +105,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);
         Point point = checkLastPoint(command, index);
         int pageIndex = 1;
-        for (; ; ) {
+        while (running) {
             Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), pageIndex++, readNum));
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {