穿云 hai 3 meses
pai
achega
5711fee4af

+ 0 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/ParserConsumer.java

@@ -9,7 +9,6 @@ import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.Watcher;
 
@@ -39,10 +38,6 @@ public final class ParserConsumer implements Watcher {
     @Override
     public void changeEvent(ChangedEvent event) {
         event.getChangedOffset().setMetaId(metaId);
-        // TODO 如果是DDL,阻塞等待队列消费完成
-        if (ChangedEventTypeEnum.isDDL(event.getType())) {
-
-        }
         bufferActuatorRouter.execute(metaId, event);
     }
 

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -106,7 +106,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      * @param request
      * @return
      */
-    protected boolean isRunning(BufferRequest request) {
+    public boolean isRunning(BufferRequest request) {
         Meta meta = profileComponent.getMeta(request.getMetaId());
         return meta != null && MetaEnum.isRunning(meta.getState());
     }

+ 22 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java

@@ -4,9 +4,10 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.config.TableGroupBufferConfig;
-import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,6 +19,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -39,7 +41,7 @@ public final class BufferActuatorRouter implements DisposableBean {
     private TableGroupBufferActuator tableGroupBufferActuator;
 
     @Resource
-    private BufferActuator generalBufferActuator;
+    private GeneralBufferActuator generalBufferActuator;
 
     /**
      * 驱动缓存执行路由列表
@@ -50,13 +52,13 @@ public final class BufferActuatorRouter implements DisposableBean {
         if (router.containsKey(metaId)) {
             router.computeIfPresent(metaId, (k, processor) -> {
                 processor.computeIfPresent(event.getSourceTableName(), (x, actuator) -> {
-                    actuator.offer(new WriterRequest(event));
+                    offer(actuator, event);
                     return actuator;
                 });
                 return processor;
             });
         }
-        generalBufferActuator.offer(new WriterRequest(event));
+        offer(generalBufferActuator, event);
     }
 
     public void bind(String metaId, List<TableGroup> tableGroups) {
@@ -92,6 +94,22 @@ public final class BufferActuatorRouter implements DisposableBean {
         router.remove(metaId);
     }
 
+    private void offer(AbstractBufferActuator actuator, ChangedEvent event) {
+        if (ChangedEventTypeEnum.isDDL(event.getType())) {
+            WriterRequest request = new WriterRequest(event);
+            // DDL事件,阻塞等待队列消费完成
+            while (actuator.getQueue().isEmpty() && actuator.isRunning(request)){
+                actuator.offer(request);
+                try {
+                    TimeUnit.MILLISECONDS.sleep(10);
+                } catch (InterruptedException ex) {
+                    logger.error(ex.getMessage(), ex);
+                }
+            }
+        }
+        actuator.offer(new WriterRequest(event));
+    }
+
     @Override
     public void destroy() {
         router.values().forEach(map -> map.values().forEach(TableGroupBufferActuator::stop));

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -44,7 +44,7 @@ public class TableGroupBufferActuator extends GeneralBufferActuator implements C
     }
 
     @Override
-    protected boolean isRunning(BufferRequest request) {
+    public boolean isRunning(BufferRequest request) {
         return running;
     }