Ver Fonte

simple code

AE86 há 1 ano atrás
pai
commit
619999fa3e

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -47,9 +47,11 @@ public class ConnectorFactory implements DisposableBean {
     private void init() {
         Map<String, ConnectorService> beans = applicationContext.getBeansOfType(ConnectorService.class);
         if (!CollectionUtils.isEmpty(beans)) {
-            beans.values().forEach(s -> service.putIfAbsent(s.getConnectorType(), s));
+            beans.values().forEach(s -> {
+                service.putIfAbsent(s.getConnectorType(), s);
+                connectorTypes.add(s.getConnectorType());
+            });
         }
-        service.values().forEach(s -> connectorTypes.add(s.getConnectorType()));
     }
 
     @Override

+ 1 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -145,7 +145,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
         // 5、批量执行同步
         BatchWriter batchWriter = new BatchWriter(tConnectorInstance, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount());
-        Result result = parserComponent.writeBatch(context, batchWriter, generalExecutor);
+        Result result = parserComponent.writeBatch(context, batchWriter, getExecutor());
 
         // 6.发布刷新增量点事件
         applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
@@ -233,8 +233,4 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return conn.getConfig();
     }
 
-    public void setGeneralExecutor(Executor generalExecutor) {
-        this.generalExecutor = generalExecutor;
-    }
-
 }

+ 6 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -10,6 +10,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.concurrent.Executor;
 
 /**
  * 表执行器(根据表消费数据,多线程批量写,按序执行)
@@ -45,6 +46,11 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
         return running;
     }
 
+    @Override
+    public Executor getExecutor() {
+        return threadPoolTaskExecutor;
+    }
+
     public void buildConfig() {
         super.setConfig(tableGroupBufferConfig);
         super.buildLock();
@@ -54,7 +60,6 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
         int queueCapacity = tableGroupBufferConfig.getThreadQueueCapacity();
         String threadNamePrefix = new StringBuilder("TableGroupExecutor-").append(tableGroupId).append(StringUtil.SYMBOL).toString();
         threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, threadNamePrefix);
-        setGeneralExecutor(threadPoolTaskExecutor);
         running = true;
         scheduledTaskService.start(taskKey, tableGroupBufferConfig.getBufferPeriodMillisecond(), this);
     }