Browse Source

修复表并发锁

Signed-off-by: AE86 <836391306@qq.com>
AE86 1 year ago
parent
commit
f9f913248a

+ 13 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -39,9 +39,9 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private Class<Response> responseClazz;
-    private final Lock taskLock = new ReentrantLock();
-    private final Lock queueLock = new ReentrantLock(true);
-    private final Condition isFull = queueLock.newCondition();
+    private Lock taskLock;
+    private Lock queueLock;
+    private Condition isFull;
     private final Duration offerInterval = Duration.of(500, ChronoUnit.MILLIS);
     private BufferActuatorConfig config;
     private BlockingQueue<Request> queue;
@@ -71,6 +71,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
     protected void buildConfig() {
         Assert.notNull(config, "请先配置缓存执行器,setConfig(BufferActuatorConfig config)");
+        buildLock();
         buildQueueConfig();
         scheduledTaskService.start(config.getBufferPeriodMillisecond(), this);
     }
@@ -83,6 +84,15 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
         logger.info("{} initialized with queue capacity: {}", this.getClass().getSimpleName(), config.getBufferQueueCapacity());
     }
 
+    /**
+     * 初始化锁
+     */
+    protected void buildLock(){
+        taskLock = new ReentrantLock();
+        queueLock = new ReentrantLock();
+        isFull = queueLock.newCondition();
+    }
+
     /**
      * 生成分区key
      *
@@ -135,7 +145,6 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
                 // 公平锁,有序执行,容量上限,阻塞重试
                 queueLock.lock();
                 while (isRunning(request) && !queue.offer((Request) request)) {
-                    logger.warn("[{}]缓存队列容量已达上限[{}], 正在阻塞重试.", this.getClass().getSimpleName(), getQueueCapacity());
                     try {
                         isFull.await(offerInterval.toMillis(), TimeUnit.MILLISECONDS);
                     } catch (InterruptedException e) {

+ 1 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -47,6 +47,7 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
 
     public void buildConfig() {
         super.setConfig(tableGroupBufferConfig);
+        super.buildLock();
         super.buildQueueConfig();
         taskKey = UUIDUtil.getUUID();
         int coreSize = tableGroupBufferConfig.getThreadCoreSize();