|
@@ -50,7 +50,25 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
|
|
|
|
|
|
@Override
|
|
|
public void start(Mapping mapping) {
|
|
|
- FullWorker worker = new FullWorker(mapping);
|
|
|
+ Thread worker = new Thread(()->{
|
|
|
+ final String metaId = mapping.getMetaId();
|
|
|
+ try {
|
|
|
+ List<TableGroup> list = manager.getSortedTableGroupAll(mapping.getId());
|
|
|
+ Assert.notEmpty(list, "映射关系不能为空");
|
|
|
+ logger.info("开始全量同步:{}, {}", metaId, mapping.getName());
|
|
|
+ map.putIfAbsent(metaId, new Task(metaId));
|
|
|
+ final ExecutorService executor = Executors.newFixedThreadPool(mapping.getThreadNum());
|
|
|
+ Task task = map.get(metaId);
|
|
|
+ doTask(task, mapping, list, executor);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ logService.log(LogType.SystemLog.ERROR, e.getMessage());
|
|
|
+ } finally {
|
|
|
+ map.remove(metaId);
|
|
|
+ publishClosedEvent(metaId);
|
|
|
+ logger.info("结束全量同步:{}, {}", metaId, mapping.getName());
|
|
|
+ }
|
|
|
+ });
|
|
|
worker.setName(new StringBuilder("full-worker-").append(mapping.getId()).toString());
|
|
|
worker.setDaemon(false);
|
|
|
worker.start();
|
|
@@ -117,34 +135,4 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
|
|
|
manager.editMeta(meta);
|
|
|
}
|
|
|
|
|
|
- final class FullWorker extends Thread {
|
|
|
- Mapping mapping;
|
|
|
- List<TableGroup> list;
|
|
|
-
|
|
|
- public FullWorker(Mapping mapping) {
|
|
|
- this.mapping = mapping;
|
|
|
- this.list = manager.getSortedTableGroupAll(mapping.getId());
|
|
|
- Assert.notEmpty(list, "映射关系不能为空");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- final String metaId = mapping.getMetaId();
|
|
|
- logger.info("开始全量同步:{}, {}", metaId, mapping.getName());
|
|
|
- try {
|
|
|
- map.putIfAbsent(metaId, new Task(metaId));
|
|
|
- final ExecutorService executor = Executors.newFixedThreadPool(mapping.getThreadNum());
|
|
|
- Task task = map.get(metaId);
|
|
|
- doTask(task, mapping, list, executor);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage());
|
|
|
- logService.log(LogType.SystemLog.ERROR, e.getMessage());
|
|
|
- } finally {
|
|
|
- map.remove(metaId);
|
|
|
- publishClosedEvent(metaId);
|
|
|
- logger.info("结束全量同步:{}, {}", metaId, mapping.getName());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
}
|