Bladeren bron

修复全量同步,断点续传

AE86 2 jaren geleden
bovenliggende
commit
5e294800b5

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -139,7 +139,7 @@ public class MappingChecker extends AbstractChecker {
         if(StringUtil.isNotBlank(metaSnapshot)){
             Map snapshot = JsonUtil.jsonToObj(metaSnapshot, HashMap.class);
             if(!CollectionUtils.isEmpty(snapshot)){
-                meta.setMap(snapshot);
+                meta.setSnapshot(snapshot);
             }
         }
 

+ 4 - 4
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -19,16 +19,16 @@ public interface Event {
     /**
      * 写入增量点事件
      *
-     * @param map
+     * @param snapshot
      */
-    void flushEvent(Map<String, String> map);
+    void flushEvent(Map<String, String> snapshot);
 
     /**
      * 强制写入增量点事件
      *
-     * @param map
+     * @param snapshot
      */
-    void forceFlushEvent(Map<String,String> map);
+    void forceFlushEvent(Map<String,String> snapshot);
 
     /**
      * 刷新事件变更时间

+ 19 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.manager.puller;
 
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.enums.ParserEnum;
 import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
@@ -73,17 +75,29 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         long now = Instant.now().toEpochMilli();
         task.setBeginTime(now);
         task.setEndTime(now);
+
+        // 获取上次同步点
+        Meta meta = manager.getMeta(task.getId());
+        Map<String, String> snapshot = meta.getSnapshot();
+        task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
+        task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
         flush(task);
 
-        for (TableGroup t : list) {
+        int i = task.getTableGroupIndex();
+        while (i < list.size()){
             if (!task.isRunning()) {
                 break;
             }
-            parser.execute(task, mapping, t, executorService);
+            parser.execute(task, mapping, list.get(i), executorService);
+            task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
+            task.setTableGroupIndex(++i);
+            flush(task);
         }
 
         // 记录结束时间
         task.setEndTime(Instant.now().toEpochMilli());
+        task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
+        task.setTableGroupIndex(ParserEnum.TABLE_GROUP_INDEX.getDefaultValue());
         flush(task);
     }
 
@@ -93,6 +107,9 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
 
         meta.setBeginTime(task.getBeginTime());
         meta.setEndTime(task.getEndTime());
+        Map<String, String> snapshot = meta.getSnapshot();
+        snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
+        snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
         manager.editMeta(meta);
     }
 

+ 8 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -145,7 +145,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
             extractor.setCommands(commands);
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
             return extractor;
         }
 
@@ -156,7 +156,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             LogListener logListener = new LogListener(mapping, list, extractor);
             logListener.getTablePicker().forEach((k, fieldPickers) -> filterTable.add(k));
             extractor.setFilterTable(filterTable);
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), logListener);
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), logListener);
             return extractor;
         }
 
@@ -182,21 +182,21 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         private LocalDateTime updateTime = LocalDateTime.now();
 
         @Override
-        public void flushEvent(Map<String, String> map) {
+        public void flushEvent(Map<String, String> snapshot) {
             // 30s内更新,执行写入
             if (updateTime.isAfter(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS))) {
-                if (!CollectionUtils.isEmpty(map)) {
-                    logger.debug("{}", map);
+                if (!CollectionUtils.isEmpty(snapshot)) {
+                    logger.debug("{}", snapshot);
                 }
-                forceFlushEvent(map);
+                forceFlushEvent(snapshot);
             }
         }
 
         @Override
-        public void forceFlushEvent(Map<String, String> map) {
+        public void forceFlushEvent(Map<String, String> snapshot) {
             Meta meta = manager.getMeta(metaId);
             if (null != meta) {
-                meta.setMap(map);
+                meta.setSnapshot(snapshot);
                 manager.editMeta(meta);
             }
         }

+ 2 - 25
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -20,7 +20,6 @@ import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.enums.ParserEnum;
 import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
@@ -246,9 +245,6 @@ public class ParserFactory implements Parser {
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
 
-        // 检查分页参数
-        Map<String, String> params = getMeta(metaId).getMap();
-        params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
         ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
@@ -261,11 +257,9 @@ public class ParserFactory implements Parser {
             }
 
             // 1、获取数据源数据
-            int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
-            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
+            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getPageIndex(), pageSize));
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
-                params.clear();
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
@@ -284,18 +278,14 @@ public class ParserFactory implements Parser {
             Result writer = writeBatch(batchWriter, executorService);
 
             // 6、更新结果
+            task.setPageIndex(task.getPageIndex() + 1);
             flush(task, writer);
 
             // 7、判断尾页
             if (data.size() < pageSize) {
-                params.clear();
                 logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
-
-            // 8、更新分页数
-            // TODO 记录表offset
-            params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
         }
     }
 
@@ -402,19 +392,6 @@ public class ParserFactory implements Parser {
         applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
     }
 
-    /**
-     * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
-     *
-     * @param metaId
-     * @return
-     */
-    private Meta getMeta(String metaId) {
-        Assert.hasText(metaId, "Meta id can not be empty.");
-        Meta meta = cacheService.get(metaId, Meta.class);
-        Assert.notNull(meta, "Meta can not be null.");
-        return meta;
-    }
-
     /**
      * 获取连接器
      *

+ 10 - 11
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ParserEnum.java

@@ -12,18 +12,24 @@ public enum ParserEnum {
     /**
      * 页数
      */
-    PAGE_INDEX("pageIndex", "1");
+    PAGE_INDEX("pageIndex", 1),
+
+    /**
+     * 执行的表映射关系索引
+     */
+    TABLE_GROUP_INDEX("tableGroupIndex", 0);
 
     /**
      * 编码
      */
     private String code;
+
     /**
      * 默认值
      */
-    private String defaultValue;
+    private int defaultValue;
 
-    ParserEnum(String code, String defaultValue) {
+    ParserEnum(String code, int defaultValue) {
         this.code = code;
         this.defaultValue = defaultValue;
     }
@@ -32,15 +38,8 @@ public enum ParserEnum {
         return code;
     }
 
-    public void setCode(String code) {
-        this.code = code;
-    }
-
-    public String getDefaultValue() {
+    public int getDefaultValue() {
         return defaultValue;
     }
 
-    public void setDefaultValue(String defaultValue) {
-        this.defaultValue = defaultValue;
-    }
 }

+ 6 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Meta.java

@@ -27,7 +27,7 @@ public class Meta extends ConfigModel {
     private AtomicLong total;
     private AtomicLong success;
     private AtomicLong fail;
-    private Map<String, String> map;
+    private Map<String, String> snapshot;
     private long beginTime;
     private long endTime;
 
@@ -47,7 +47,7 @@ public class Meta extends ConfigModel {
         this.total = new AtomicLong(0);
         this.success = new AtomicLong(0);
         this.fail = new AtomicLong(0);
-        this.map = new LinkedHashMap<>();
+        this.snapshot = new LinkedHashMap<>();
         this.beginTime = 0L;
         this.endTime = 0L;
     }
@@ -92,12 +92,12 @@ public class Meta extends ConfigModel {
         this.fail = fail;
     }
 
-    public Map<String, String> getMap() {
-        return map;
+    public Map<String, String> getSnapshot() {
+        return snapshot;
     }
 
-    public void setMap(Map<String, String> map) {
-        this.map = map;
+    public void setSnapshot(Map<String, String> snapshot) {
+        this.snapshot = snapshot;
     }
 
     public long getBeginTime() {

+ 20 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java

@@ -6,6 +6,10 @@ public class Task {
 
     private StateEnum state;
 
+    private int tableGroupIndex;
+
+    private int pageIndex;
+
     private long beginTime;
 
     private long endTime;
@@ -34,6 +38,22 @@ public class Task {
         this.id = id;
     }
 
+    public int getTableGroupIndex() {
+        return tableGroupIndex;
+    }
+
+    public void setTableGroupIndex(int tableGroupIndex) {
+        this.tableGroupIndex = tableGroupIndex;
+    }
+
+    public int getPageIndex() {
+        return pageIndex;
+    }
+
+    public void setPageIndex(int pageIndex) {
+        this.pageIndex = pageIndex;
+    }
+
     public long getBeginTime() {
         return beginTime;
     }

+ 2 - 2
dbsyncer-web/src/main/resources/public/mapping/editIncrement.html

@@ -26,7 +26,7 @@
                 <input id="metaSnapshot" name="metaSnapshot" class="form-control hidden" type="text"/>
             </div>
             <div class="col-md-11">
-                <table th:if="${mapping?.meta?.map?.size() gt 0}" class="table table-hover">
+                <table th:if="${mapping?.meta?.snapshot?.size() gt 0}" class="table table-hover">
                     <thead>
                     <tr>
                         <th>Key</th>
@@ -35,7 +35,7 @@
                     </tr>
                     </thead>
                     <tbody id="mappingMetaSnapshotConfig">
-                    <tr th:each="item,entry:${mapping?.meta?.map}">
+                    <tr th:each="item,entry:${mapping?.meta?.snapshot}">
                         <td th:text="${entry.current.key}"></td>
                         <td th:text="${entry.current.value}"></td>
                         <td><i th:title="修改" class="fa fa-edit well-sign-green dbsyncer_pointer metaSnapshotModify"></i></td>