AE86 vor 5 Jahren
Ursprung
Commit
fa70f741f6

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -78,7 +78,7 @@ public class ConnectorFactory {
      * @param connectorType
      * @return
      */
-    private Connector getConnector(String connectorType) {
+    public Connector getConnector(String connectorType) {
         // 获取连接器类型
         Assert.hasText(connectorType, "ConnectorType can not be empty.");
         return ConnectorEnum.getConnector(connectorType);

+ 3 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/FullExtractor.java

@@ -42,24 +42,19 @@ public class FullExtractor extends AbstractExtractor {
     @Override
     public void asyncStart(Mapping mapping) {
         final String mappingId = mapping.getId();
-        final String sourceConnectorId = mapping.getSourceConnectorId();
         final String metaId = mapping.getMetaId();
         int batchNum = mapping.getBatchNum();
         int threadNum = mapping.getThreadNum();
         map.putIfAbsent(metaId, new Task(metaId, batchNum, threadNum));
 
         try {
-            Connector connector = manager.getConnector(sourceConnectorId);
-            Assert.notNull(connector, "数据源连接器不能为空.");
-            ConnectorConfig config = connector.getConfig();
-            Assert.notNull(config, "连接器配置不能为空.");
             List<TableGroup> list = manager.getTableGroupAll(mappingId);
             Assert.notEmpty(list, "映射关系为空");
 
             // 执行任务
             logger.info("启动任务:{}", metaId);
             Task task = map.get(metaId);
-            doTask(task, config, list);
+            doTask(task, mapping, list);
 
         } catch (Exception e) {
             logger.error(e.getMessage());
@@ -78,7 +73,7 @@ public class FullExtractor extends AbstractExtractor {
         }
     }
 
-    private void doTask(Task task, ConnectorConfig config, List<TableGroup> list) {
+    private void doTask(Task task, Mapping mapping, List<TableGroup> list) {
         // 记录开始时间
         task.setBeginTime(System.currentTimeMillis());
         flush(task);
@@ -87,7 +82,7 @@ public class FullExtractor extends AbstractExtractor {
             if (!task.isRunning()) {
                 break;
             }
-            parser.execute(task, config, t);
+            parser.execute(task, mapping, t);
         }
 
         // 记录结束时间

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -8,6 +8,7 @@ import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 
 import java.util.List;
@@ -105,8 +106,8 @@ public interface Parser {
      * 全量同步
      *
      * @param task
-     * @param config
+     * @param mapping
      * @param tableGroup
      */
-    void execute(Task task, ConnectorConfig config, TableGroup tableGroup);
+    void execute(Task task, Mapping mapping, TableGroup tableGroup);
 }

+ 17 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -15,8 +15,8 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.storage.SnowflakeIdWorker;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
@@ -48,9 +48,6 @@ public class ParserFactory implements Parser {
     @Autowired
     private CacheService cacheService;
 
-    @Autowired
-    private SnowflakeIdWorker snowflakeIdWorker;
-
     @Override
     public boolean alive(ConnectorConfig config) {
         return connectorFactory.isAlive(config);
@@ -143,12 +140,27 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Task task, ConnectorConfig config, TableGroup tableGroup) {
+    public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
+        final String sourceConnectorId = mapping.getSourceConnectorId();
+        final String targetConnectorId = mapping.getTargetConnectorId();
+        ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
+        ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
+        Assert.notNull(sConfig, "数据源配置不能为空.");
+        Assert.notNull(tConfig, "目标源配置不能为空.");
+
         try {
             for (int i = 0; i < 10; i++) {
                 if (!task.isRunning()) {
                     break;
                 }
+
+                // TODO 全量同步任务
+                // 1、获取数据源数据
+                // 2、值映射
+                // 3、参数转换
+                // 4、插件转换
+                // 5、写入目标源
+
                 logger.info("模拟迁移5s");
                 TimeUnit.SECONDS.sleep(5);
             }