فهرست منبع

定时同步支持游标

AE86 2 سال پیش
والد
کامیت
1d224ae3e8

+ 0 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -11,13 +11,6 @@ public class ReaderConfig {
     private int pageIndex;
     private int pageIndex;
     private int pageSize;
     private int pageSize;
 
 
-    public ReaderConfig(Map<String, String> command, List<Object> args, int pageIndex, int pageSize) {
-        this.command = command;
-        this.args = args;
-        this.pageIndex = pageIndex;
-        this.pageSize = pageSize;
-    }
-
     public ReaderConfig(Map<String,String> command, List<Object> args, String cursor, int pageIndex, int pageSize) {
     public ReaderConfig(Map<String,String> command, List<Object> args, String cursor, int pageIndex, int pageSize) {
         this.command = command;
         this.command = command;
         this.args = args;
         this.args = args;

+ 21 - 11
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -31,10 +31,8 @@ import java.util.stream.Stream;
 public abstract class AbstractQuartzExtractor extends AbstractExtractor implements ScheduledTaskJob {
 public abstract class AbstractQuartzExtractor extends AbstractExtractor implements ScheduledTaskJob {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private List<Map<String, String>> commands;
-    private int commandSize;
-
+    private static final String CURSOR = "cursor";
+    private List<TableGroupCommand> commands;
     private int readNum;
     private int readNum;
     private String eventFieldName;
     private String eventFieldName;
     private Set<String> update;
     private Set<String> update;
@@ -55,8 +53,6 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
 
     @Override
     @Override
     public void start() {
     public void start() {
-        commandSize = commands.size();
-
         readNum = listenerConfig.getReadNum();
         readNum = listenerConfig.getReadNum();
         eventFieldName = listenerConfig.getEventFieldName();
         eventFieldName = listenerConfig.getEventFieldName();
         update = Stream.of(listenerConfig.getUpdate().split(",")).collect(Collectors.toSet());
         update = Stream.of(listenerConfig.getUpdate().split(",")).collect(Collectors.toSet());
@@ -77,7 +73,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         try {
         try {
             locked = taskLock.tryLock();
             locked = taskLock.tryLock();
             if (locked) {
             if (locked) {
-                for (int i = 0; i < commandSize; i++) {
+                for (int i = 0; i < commands.size(); i++) {
                     execute(commands.get(i), i);
                     execute(commands.get(i), i);
                 }
                 }
             }
             }
@@ -97,13 +93,17 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         running = false;
         running = false;
     }
     }
 
 
-    private void execute(Map<String, String> command, int index) {
+    private void execute(TableGroupCommand tableGroupCommand, int index) {
+        final Map<String, String> command = tableGroupCommand.getCommand();
+        final String pk = tableGroupCommand.getPk();
+
         // 检查增量点
         // 检查增量点
         ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);
         ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);
         Point point = checkLastPoint(command, index);
         Point point = checkLastPoint(command, index);
         int pageIndex = 1;
         int pageIndex = 1;
+        String cursor = snapshot.get(index + CURSOR);
         while (running) {
         while (running) {
-            Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), pageIndex++, readNum));
+            Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), cursor, pageIndex++, readNum));
             List<Map> data = reader.getSuccessData();
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
             if (CollectionUtils.isEmpty(data)) {
                 break;
                 break;
@@ -111,7 +111,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
 
             Object event = null;
             Object event = null;
             for (Map<String, Object> row : data) {
             for (Map<String, Object> row : data) {
-                if(StringUtil.isBlank(eventFieldName)){
+                if (StringUtil.isBlank(eventFieldName)) {
                     changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                     continue;
                 }
                 }
@@ -132,6 +132,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
 
             }
             }
             // 更新记录点
             // 更新记录点
+            cursor = getLastCursor(data, pk);
             point.refresh();
             point.refresh();
 
 
             if (data.size() < readNum) {
             if (data.size() < readNum) {
@@ -143,12 +144,21 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         // 持久化
         // 持久化
         if (point.refreshed()) {
         if (point.refreshed()) {
             snapshot.putAll(point.getPosition());
             snapshot.putAll(point.getPosition());
+            snapshot.put(index + CURSOR, cursor);
         }
         }
 
 
     }
     }
 
 
-    public void setCommands(List<Map<String, String>> commands) {
+    public void setCommands(List<TableGroupCommand> commands) {
         this.commands = commands;
         this.commands = commands;
     }
     }
 
 
+    private String getLastCursor(List<Map> data, String pk) {
+        if (!CollectionUtils.isEmpty(data)) {
+            Object value = data.get(data.size() - 1).get(pk);
+            return value == null ? "" : String.valueOf(value);
+        }
+        return "";
+    }
+
 }
 }

+ 11 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/DatabaseQuartzExtractor.java

@@ -56,12 +56,14 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
         Point point = new Point();
         Point point = new Point();
         // 存在系统参数,替换
         // 存在系统参数,替换
         String replaceQuery = query;
         String replaceQuery = query;
+        String replaceQueryCursor = command.get(ConnectorConstant.OPERTION_QUERY_CURSOR);
         for (QuartzFilterEnum quartzFilter : filterEnums) {
         for (QuartzFilterEnum quartzFilter : filterEnums) {
             final String type = quartzFilter.getType();
             final String type = quartzFilter.getType();
             final QuartzFilter f = quartzFilter.getQuartzFilter();
             final QuartzFilter f = quartzFilter.getQuartzFilter();
 
 
             // 替换字符
             // 替换字符
-            replaceQuery = StringUtil.replace(replaceQuery, "'" + type + "'", "?");
+            replaceQuery = replaceType(replaceQuery, type);
+            replaceQueryCursor = replaceType(replaceQueryCursor, type);
 
 
             // 创建参数索引key
             // 创建参数索引key
             final String key = index + type;
             final String key = index + type;
@@ -88,6 +90,9 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
             point.setBeginValue(f.toString(val));
             point.setBeginValue(f.toString(val));
         }
         }
         point.setCommand(ConnectorConstant.OPERTION_QUERY, replaceQuery);
         point.setCommand(ConnectorConstant.OPERTION_QUERY, replaceQuery);
+        if (StringUtil.isNotBlank(replaceQueryCursor)) {
+            point.setCommand(ConnectorConstant.OPERTION_QUERY_CURSOR, replaceQueryCursor);
+        }
         if (reversed.get()) {
         if (reversed.get()) {
             point.reverseArgs();
             point.reverseArgs();
         }
         }
@@ -95,4 +100,8 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
         return point;
         return point;
     }
     }
 
 
-}
+    private String replaceType(String replaceQuery, String type) {
+        return StringUtil.isNotBlank(replaceQuery) ? StringUtil.replace(replaceQuery, "'" + type + "'", "?") : replaceQuery;
+    }
+
+}

+ 23 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java

@@ -0,0 +1,23 @@
+package org.dbsyncer.listener.quartz;
+
+import java.util.Map;
+
+public class TableGroupCommand {
+
+    private String pk;
+
+    private Map<String, String> command;
+
+    public TableGroupCommand(String pk, Map<String, String> command) {
+        this.pk = pk;
+        this.command = command;
+    }
+
+    public String getPk() {
+        return pk;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+}

+ 6 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -15,6 +15,7 @@ import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.listener.quartz.AbstractQuartzExtractor;
 import org.dbsyncer.listener.quartz.AbstractQuartzExtractor;
+import org.dbsyncer.listener.quartz.TableGroupCommand;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.config.FieldPicker;
@@ -24,6 +25,7 @@ import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -143,8 +145,10 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         // 默认定时抽取
         // 默认定时抽取
         if (ListenerTypeEnum.isTiming(listenerType)) {
         if (ListenerTypeEnum.isTiming(listenerType)) {
             AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
-            List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
-            extractor.setCommands(commands);
+            extractor.setCommands(list.stream().map(t -> {
+                Picker picker = new Picker(t.getFieldMapping());
+                return new TableGroupCommand(picker.getSourcePrimaryKeyName(), t.getCommand());
+            }).collect(Collectors.toList()));
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
             return extractor;
             return extractor;
         }
         }