Przeglądaj źródła

修复中止阻塞任务

Signed-off-by: AE86 <836391306@qq.com>
AE86 1 rok temu
rodzic
commit
ec467f91ef

+ 4 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -200,7 +200,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
     abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
         protected Meta meta;
-        protected Map<String, TableGroupBufferActuator> router = new ConcurrentHashMap<>();
+        private Map<String, TableGroupBufferActuator> router = new ConcurrentHashMap<>();
         private final int MAX_BUFFER_ACTUATOR_SIZE = 10;
 
         public abstract void onChange(E e);
@@ -246,15 +246,16 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                 return;
             }
             router.computeIfAbsent(tableGroupId, k -> {
+                TableGroupBufferActuator newBufferActuator = null;
                 try {
-                    TableGroupBufferActuator newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
+                    newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
                     newBufferActuator.setTableGroupId(tableGroupId);
                     newBufferActuator.buildConfig();
                     return newBufferActuator;
                 } catch (CloneNotSupportedException ex) {
                     logger.error(ex.getMessage(), ex);
                 }
-                return null;
+                return newBufferActuator;
             });
         }
     }
@@ -294,7 +295,6 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                 tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
                 buildBufferActuator(group.getId());
             });
-
         }
 
         @Override

+ 18 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -1,8 +1,11 @@
 package org.dbsyncer.parser.flush;
 
+import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.model.Meta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -49,6 +52,9 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     @Resource
     private ScheduledTaskService scheduledTaskService;
 
+    @Resource
+    private CacheService cacheService;
+
     public AbstractBufferActuator() {
         int level = 5;
         Class<?> aClass = getClass();
@@ -93,6 +99,17 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
     protected abstract void partition(Request request, Response response);
 
+    /**
+     * 驱动是否运行中
+     *
+     * @param request
+     * @return
+     */
+    protected boolean isRunning(BufferRequest request) {
+        Meta meta = cacheService.get(request.getMetaId(), Meta.class);
+        return meta != null && MetaEnum.isRunning(meta.getState());
+    }
+
     /**
      * 是否跳过分区处理
      *
@@ -117,7 +134,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
             try {
                 // 公平锁,有序执行,容量上限,阻塞重试
                 queueLock.lock();
-                while (!queue.offer((Request) request)) {
+                while (isRunning(request) && !queue.offer((Request) request)) {
                     logger.warn("[{}]缓存队列容量已达上限[{}], 正在阻塞重试.", this.getClass().getSimpleName(), getQueueCapacity());
                     try {
                         isFull.await(OFFER_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);

+ 6 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferRequest.java

@@ -7,4 +7,10 @@ package org.dbsyncer.parser.flush;
  */
 public interface BufferRequest {
 
+    /**
+     * 获取驱动ID
+     *
+     * @return
+     */
+    String getMetaId();
 }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -6,6 +6,7 @@ import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.ThreadPoolUtil;
 import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.parser.flush.BufferRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -37,11 +38,18 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
 
     private String tableGroupId;
 
+    private volatile boolean running;
+
     @Override
     public void init() {
         // nothing to do
     }
 
+    @Override
+    protected boolean isRunning(BufferRequest request) {
+        return running;
+    }
+
     public void buildConfig() {
         BufferActuatorConfig actuatorConfig = super.getBufferActuatorConfig();
         try {
@@ -49,6 +57,7 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
             // TODO 暂定容量上限
             newConfig.setQueueCapacity(50000);
             setBufferActuatorConfig(newConfig);
+            running = true;
             super.buildBufferActuatorConfig();
             taskKey = UUIDUtil.getUUID();
             String threadNamePrefix = new StringBuilder("writeExecutor-").append(tableGroupId).append(StringUtil.SYMBOL).toString();
@@ -66,6 +75,7 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
     }
 
     public void stop() {
+        running = false;
         if (threadPoolTaskExecutor != null) {
             threadPoolTaskExecutor.shutdown();
         }

+ 1 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/StorageRequest.java

@@ -20,6 +20,7 @@ public class StorageRequest implements BufferRequest {
         this.row = row;
     }
 
+    @Override
     public String getMetaId() {
         return metaId;
     }

+ 5 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -24,6 +24,11 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
         this.changedOffset = event.getChangedOffset();
     }
 
+    @Override
+    public String getMetaId() {
+        return changedOffset.getMetaId();
+    }
+    
     public Map getRow() {
         return row;
     }