AE86 3 лет назад
Родитель
Сommit
5accefd011

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -34,13 +34,13 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
 
     @Override
     public void start(String key, String cron, ScheduledTaskJob job) {
-        logger.info("{}-[{}], Started task [{}]", key, cron, job.getClass().getName());
+        logger.info("[{}], Started task [{}]", cron, job.getClass().getName());
         apply(key, () -> taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger)));
     }
 
     @Override
     public void start(String key, long period, ScheduledTaskJob job) {
-        logger.info("[period={}], Started task [{}]", period, key);
+        logger.info("[period={}], Started task [{}]", period, job.getClass().getName());
         apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
     }
 

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -80,7 +80,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
     @PostConstruct
     private void init() {
-        scheduledTaskService.start("*/10 * * * * ?", this);
+        scheduledTaskService.start(10000, this);
     }
 
     @Override

+ 21 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -14,6 +14,8 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @author AE86
@@ -31,7 +33,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private Queue<Request> temp = new ConcurrentLinkedQueue();
 
-    private final Object LOCK = new Object();
+    private final Lock lock = new ReentrantLock(true);
 
     private volatile boolean running;
 
@@ -91,14 +93,23 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
         if (running) {
             return;
         }
-        synchronized (LOCK) {
-            if (running) {
-                return;
+
+        final Lock bufferLock = lock;
+        boolean locked = false;
+        try {
+            locked = bufferLock.tryLock();
+            if (locked && !running) {
+                running = true;
+                flush(buffer);
+                running = false;
+                flush(temp);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            if (locked) {
+                bufferLock.unlock();
             }
-            running = true;
-            flush(buffer);
-            running = false;
-            flush(temp);
         }
     }
 
@@ -119,9 +130,9 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
                 try {
                     pull((Response) flushTask);
                 } catch (Exception e) {
-                    logger.error("[{}]-flush异常{}", key);
+                    logger.error("[{}]异常{}", key);
                 }
-                logger.info("[{}]-flush{}条,耗时{}秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now) / 1000);
+                logger.info("[{}]{}条,耗时{}秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now) / 1000);
             });
             map.clear();
         }