AE86 1 năm trước cách đây
mục cha
commit
afb8648d8f

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -123,7 +123,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      *
      * @param response
      */
-    protected abstract void pull(Response response);
+    public abstract void pull(Response response);
 
     /**
      * 批量处理分区数据

+ 6 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -120,9 +120,9 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     }
 
     @Override
-    protected void pull(WriterResponse response) {
+    public void pull(WriterResponse response) {
         // 0、获取配置信息
-        final TableGroup tableGroup = profileComponent.getTableGroup(response.getTableGroupId());
+        final TableGroup tableGroup = getTableGroup(response.getTableGroupId());
         final Mapping mapping = profileComponent.getMapping(tableGroup.getMappingId());
         final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
 
@@ -176,6 +176,10 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return generalExecutor;
     }
 
+    public TableGroup getTableGroup(String tableGroupId) {
+        return profileComponent.getTableGroup(tableGroupId);
+    }
+
     /**
      * 解析DDL
      *

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -63,7 +63,7 @@ public final class StorageBufferActuator extends AbstractBufferActuator<StorageR
     }
 
     @Override
-    protected void pull(StorageResponse response) {
+    public void pull(StorageResponse response) {
         storageExecutor.execute(() -> storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList()));
     }