1
0
AE86 1 жил өмнө
parent
commit
d7b654cb66

+ 27 - 27
dbsyncer-common/src/main/java/org/dbsyncer/common/event/PageChangedEvent.java → dbsyncer-common/src/main/java/org/dbsyncer/common/event/ScanChangedEvent.java

@@ -1,28 +1,28 @@
-/**
- * DBSyncer Copyright 2019-2024 All Rights Reserved.
- */
-package org.dbsyncer.common.event;
-
-import java.util.Map;
-
-/**
- * 分页变更事件
- *
- * @version 1.0.0
- * @Author AE86
- * @Date 2023-08-20 20:00
- */
-public final class PageChangedEvent extends CommonChangedEvent {
-
-    private int tableGroupIndex;
-
-    public PageChangedEvent(int index, String event, Map<String, Object> changedRow) {
-        this.tableGroupIndex = index;
-        setEvent(event);
-        setChangedRow(changedRow);
-    }
-
-    public int getTableGroupIndex() {
-        return tableGroupIndex;
-    }
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.event;
+
+import java.util.Map;
+
+/**
+ * 定时扫表变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-08-20 20:00
+ */
+public final class ScanChangedEvent extends CommonChangedEvent {
+
+    private int tableGroupIndex;
+
+    public ScanChangedEvent(int index, String event, Map<String, Object> changedRow) {
+        this.tableGroupIndex = index;
+        setEvent(event);
+        setChangedRow(changedRow);
+    }
+
+    public int getTableGroupIndex() {
+        return tableGroupIndex;
+    }
 }

+ 5 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.quartz;
 
-import org.dbsyncer.common.event.PageChangedEvent;
+import org.dbsyncer.common.event.ScanChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.spi.ConnectorMapper;
@@ -119,21 +119,21 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
             for (Map<String, Object> row : data) {
                 if (forceUpdate) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
 
                 Object eventValue = row.get(eventFieldName);
                 if (update.contains(eventValue)) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
                 if (insert.contains(eventValue)) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
                     continue;
                 }
                 if (delete.contains(eventValue)) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
                 }
             }
             // 更新记录点

+ 1 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -145,6 +145,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
 
         meta.setBeginTime(task.getBeginTime());
         meta.setEndTime(task.getEndTime());
+        meta.setUpdateTime(Instant.now().toEpochMilli());
         Map<String, String> snapshot = meta.getSnapshot();
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
         snapshot.put(ParserEnum.CURSOR.getCode(), StringUtil.join(task.getCursors(), ","));

+ 4 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -2,7 +2,7 @@ package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
-import org.dbsyncer.common.event.PageChangedEvent;
+import org.dbsyncer.common.event.ScanChangedEvent;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.event.Watcher;
@@ -143,7 +143,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     @Override
     public void run() {
         // 定时同步增量信息
-        map.forEach((k, v) -> v.flushEvent());
+        map.values().forEach(extractor -> extractor.flushEvent());
     }
 
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
@@ -220,7 +220,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
         }
     }
 
-    final class QuartzConsumer extends AbstractConsumer<PageChangedEvent> {
+    final class QuartzConsumer extends AbstractConsumer<ScanChangedEvent> {
         private List<FieldPicker> tablePicker = new LinkedList<>();
 
         public QuartzConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
@@ -229,7 +229,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
         }
 
         @Override
-        public void onChange(PageChangedEvent event) {
+        public void onChange(ScanChangedEvent event) {
             final FieldPicker picker = tablePicker.get(event.getTableGroupIndex());
             TableGroup tableGroup = picker.getTableGroup();
             event.setSourceTableName(tableGroup.getSourceTable().getName());

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/OperationTemplate.java

@@ -160,7 +160,7 @@ public final class OperationTemplate {
         }
     }
 
-    static class Group {
+    class Group {
 
         private List<String> index;