Explorar o código

fixbug 修复死锁 https://gitee.com/ghi/dbsyncer/issues/I5JFAZ

AE86 %!s(int64=2) %!d(string=hai) anos
pai
achega
c0ce08ea3c

+ 1 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -93,6 +93,7 @@ public class MappingChecker extends AbstractChecker {
         // 全量配置
         mapping.setReadNum(NumberUtil.toInt(params.get("readNum"), mapping.getReadNum()));
         mapping.setBatchNum(NumberUtil.toInt(params.get("batchNum"), mapping.getBatchNum()));
+        mapping.setThreadNum(NumberUtil.toInt(params.get("threadNum"), mapping.getThreadNum()));
 
         // 增量配置(日志/定时)
         String incrementStrategy = params.get("incrementStrategy");

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -369,7 +369,7 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         // 标记运行中
         changeMetaState(mapping.getMetaId(), MetaEnum.RUNNING);
 
-        puller.asyncStart(mapping);
+        puller.start(mapping);
     }
 
     @Override
@@ -412,4 +412,4 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         return puller;
     }
 
-}
+}

+ 39 - 24
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -20,6 +20,8 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * 全量同步
@@ -45,28 +47,11 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
     private Map<String, Task> map = new ConcurrentHashMap<>();
 
     @Override
-    public void asyncStart(Mapping mapping) {
-        final String mappingId = mapping.getId();
-        final String metaId = mapping.getMetaId();
-        map.putIfAbsent(metaId, new Task(metaId));
-
-        try {
-            List<TableGroup> list = manager.getTableGroupAll(mappingId);
-            Assert.notEmpty(list, "映射关系不能为空");
-
-            // 执行任务
-            logger.info("启动任务:{}", metaId);
-            Task task = map.get(metaId);
-            doTask(task, mapping, list);
-
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-            logService.log(LogType.SystemLog.ERROR, e.getMessage());
-        } finally {
-            map.remove(metaId);
-            publishClosedEvent(metaId);
-            logger.info("结束任务:{}", metaId);
-        }
+    public void start(Mapping mapping) {
+        FullWorker worker = new FullWorker(mapping);
+        worker.setName(new StringBuilder("full-worker-").append(mapping.getId()).toString());
+        worker.setDaemon(false);
+        worker.start();
     }
 
     @Override
@@ -83,7 +68,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         flush(event.getTask());
     }
 
-    private void doTask(Task task, Mapping mapping, List<TableGroup> list) {
+    private void doTask(Task task, Mapping mapping, List<TableGroup> list, ExecutorService executorService) {
         // 记录开始时间
         long now = Instant.now().toEpochMilli();
         task.setBeginTime(now);
@@ -94,7 +79,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
             if (!task.isRunning()) {
                 break;
             }
-            parser.execute(task, mapping, t);
+            parser.execute(task, mapping, t, executorService);
         }
 
         // 记录结束时间
@@ -111,4 +96,34 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         manager.editMeta(meta);
     }
 
+    final class FullWorker extends Thread {
+        Mapping mapping;
+        List<TableGroup> list;
+
+        public FullWorker(Mapping mapping) {
+            this.mapping = mapping;
+            this.list = manager.getTableGroupAll(mapping.getId());
+            Assert.notEmpty(list, "映射关系不能为空");
+        }
+
+        @Override
+        public void run() {
+            final String metaId = mapping.getMetaId();
+            logger.info("开始全量同步:{}, {}", metaId, mapping.getName());
+            try {
+                map.putIfAbsent(metaId, new Task(metaId));
+                final ExecutorService executor = Executors.newFixedThreadPool(mapping.getThreadNum());
+                Task task = map.get(metaId);
+                doTask(task, mapping, list, executor);
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+                logService.log(LogType.SystemLog.ERROR, e.getMessage());
+            } finally {
+                map.remove(metaId);
+                publishClosedEvent(metaId);
+                logger.info("结束全量同步:{}, {}", metaId, mapping.getName());
+            }
+        }
+    }
+
 }

+ 26 - 26
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -5,12 +5,10 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
@@ -86,32 +84,34 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     }
 
     @Override
-    public void asyncStart(Mapping mapping) {
+    public void start(Mapping mapping) {
         final String mappingId = mapping.getId();
         final String metaId = mapping.getMetaId();
-        try {
-            Connector connector = manager.getConnector(mapping.getSourceConnectorId());
-            Assert.notNull(connector, "连接器不能为空.");
-            List<TableGroup> list = manager.getTableGroupAll(mappingId);
-            Assert.notEmpty(list, "映射关系不能为空.");
-            Meta meta = manager.getMeta(metaId);
-            Assert.notNull(meta, "Meta不能为空.");
-            AbstractExtractor extractor = getExtractor(mapping, connector, list, meta);
-
-            long now = Instant.now().toEpochMilli();
-            meta.setBeginTime(now);
-            meta.setEndTime(now);
-            manager.editMeta(meta);
-            map.putIfAbsent(metaId, extractor);
-
-            // 执行任务
-            logger.info("启动成功:{}", metaId);
-            map.get(metaId).start();
-        } catch (Exception e) {
-            close(metaId);
-            logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
-            logger.error("运行异常,结束任务{}:{}", metaId, e.getMessage());
-        }
+        logger.info("开始增量同步:{}, {}", metaId, mapping.getName());
+        Connector connector = manager.getConnector(mapping.getSourceConnectorId());
+        Assert.notNull(connector, "连接器不能为空.");
+        List<TableGroup> list = manager.getTableGroupAll(mappingId);
+        Assert.notEmpty(list, "映射关系不能为空.");
+        Meta meta = manager.getMeta(metaId);
+        Assert.notNull(meta, "Meta不能为空.");
+
+        Thread worker = new Thread(()->{
+            try {
+                long now = Instant.now().toEpochMilli();
+                meta.setBeginTime(now);
+                meta.setEndTime(now);
+                manager.editMeta(meta);
+                map.putIfAbsent(metaId, getExtractor(mapping, connector, list, meta));
+                map.get(metaId).start();
+            } catch (Exception e) {
+                close(metaId);
+                logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
+                logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
+            }
+        });
+        worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
+        worker.setDaemon(false);
+        worker.start();
     }
 
     @Override

+ 1 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/Puller.java

@@ -1,12 +1,10 @@
 package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.parser.model.Mapping;
-import org.springframework.scheduling.annotation.Async;
 
 public interface Puller {
 
-    @Async("taskExecutor")
-    void asyncStart(Mapping mapping);
+    void start(Mapping mapping);
 
     void close(String metaId);
 

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -16,6 +16,7 @@ import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * @author AE86
@@ -148,8 +149,9 @@ public interface Parser {
      * @param task
      * @param mapping
      * @param tableGroup
+     * @param executorService
      */
-    void execute(Task task, Mapping mapping, TableGroup tableGroup);
+    void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService);
 
     /**
      * 增量同步

+ 17 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -5,7 +5,6 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.CommandConfig;
@@ -49,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 /**
  * @author AE86
@@ -227,7 +227,7 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
+    public void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService) {
         final String metaId = task.getId();
         final String sourceConnectorId = mapping.getSourceConnectorId();
         final String targetConnectorId = mapping.getTargetConnectorId();
@@ -280,7 +280,8 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            Result writer = writeBatch(new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize));
+            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
+            Result writer = writeBatch(batchWriter, executorService);
 
             // 6、更新结果
             flush(task, writer);
@@ -288,11 +289,12 @@ public class ParserFactory implements Parser {
             // 7、判断尾页
             if (data.size() < pageSize) {
                 params.clear();
-                logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
+                logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
 
             // 8、更新分页数
+            // TODO 记录表offset
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
         }
     }
@@ -323,6 +325,17 @@ public class ParserFactory implements Parser {
      */
     @Override
     public Result writeBatch(BatchWriter batchWriter) {
+        return writeBatch(batchWriter, taskExecutor);
+    }
+
+    /**
+     * 批量写入
+     *
+     * @param batchWriter
+     * @param taskExecutor
+     * @return
+     */
+    private Result writeBatch(BatchWriter batchWriter, Executor taskExecutor) {
         List<Map> dataList = batchWriter.getDataList();
         int batchSize = batchWriter.getBatchSize();
         String tableName = batchWriter.getTableName();

+ 13 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -46,6 +46,11 @@ public class Mapping extends AbstractConfigModel {
     // 单次写入
     private int batchNum = 1000;
 
+    /**
+     * 线程数
+     */
+    private int threadNum = Runtime.getRuntime().availableProcessors() * 2;
+
     public String getSourceConnectorId() {
         return sourceConnectorId;
     }
@@ -118,9 +123,15 @@ public class Mapping extends AbstractConfigModel {
         return batchNum;
     }
 
-    public Mapping setBatchNum(int batchNum) {
+    public void setBatchNum(int batchNum) {
         this.batchNum = batchNum;
-        return this;
     }
 
+    public int getThreadNum() {
+        return threadNum;
+    }
+
+    public void setThreadNum(int threadNum) {
+        this.threadNum = threadNum;
+    }
 }

+ 6 - 1
dbsyncer-web/src/main/resources/public/mapping/editFull.html

@@ -19,7 +19,12 @@
                     <input type="number" name="batchNum" class="form-control" min="1" dbsyncer-valid="require" th:value="${mapping?.batchNum}">
                 </div>
             </div>
-            <div class="col-md-4"></div>
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">线程数<strong class="driverVerifcateRequired">*</strong></label>
+                <div class="col-sm-9">
+                    <input type="number" name="threadNum" class="form-control" min="1" dbsyncer-valid="require" th:value="${mapping?.threadNum}">
+                </div>
+            </div>
         </div>
     </div>
 </div>