AE86 5 tahun lalu
induk
melakukan
5ba9822e22

+ 9 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/QuartzFilter.java

@@ -24,4 +24,13 @@ public interface QuartzFilter {
      * @return
      */
     String toString(Object value);
+
+    /**
+     * 是否开始字段
+     *
+     * @return
+     */
+    default boolean begin(){
+        return true;
+    }
 }

+ 55 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/QuartzFilterEnum.java

@@ -16,9 +16,9 @@ import java.util.Date;
 public enum QuartzFilterEnum {
 
     /**
-     * 时间戳
+     * 时间戳(开始)
      */
-    TIME_STAMP("$timestamp$", "系统时间戳", new QuartzFilter() {
+    TIME_STAMP_BEGIN("$timestamp_begin$", "系统时间戳(开始)", new QuartzFilter() {
         @Override
         public Object getObject() {
             return new Timestamp(Instant.now().toEpochMilli());
@@ -36,9 +36,34 @@ public enum QuartzFilterEnum {
         }
     }),
     /**
-     * 日期
+     * 时间戳(结束)
      */
-    DATE("$date$", "系统日期", new QuartzFilter() {
+    TIME_STAMP_END("$timestamp_end$", "系统时间戳(结束)", new QuartzFilter() {
+        @Override
+        public Object getObject() {
+            return new Timestamp(Instant.now().toEpochMilli());
+        }
+
+        @Override
+        public Object getObject(String s) {
+            return new Timestamp(Long.parseLong(s));
+        }
+
+        @Override
+        public String toString(Object value) {
+            Timestamp ts = (Timestamp) value;
+            return String.valueOf(ts.getTime());
+        }
+
+        @Override
+        public boolean begin() {
+            return false;
+        }
+    }),
+    /**
+     * 日期(开始)
+     */
+    DATE_BEGIN("$date_begin$", "系统日期(开始)", new QuartzFilter() {
         @Override
         public Object getObject() {
             return new Date();
@@ -46,6 +71,7 @@ public enum QuartzFilterEnum {
 
         @Override
         public Object getObject(String s) {
+            // TODO 日期转换
             DateTimeFormatter.ofPattern("yyyy-MM-dd");
             return null;
         }
@@ -54,6 +80,31 @@ public enum QuartzFilterEnum {
         public String toString(Object value) {
             return String.valueOf(value);
         }
+    }),
+    /**
+     * 日期(结束)
+     */
+    DATE_END("$date_end$", "系统日期(结束)", new QuartzFilter() {
+        @Override
+        public Object getObject() {
+            return new Date();
+        }
+
+        @Override
+        public Object getObject(String s) {
+            DateTimeFormatter.ofPattern("yyyy-MM-dd");
+            return null;
+        }
+
+        @Override
+        public String toString(Object value) {
+            return String.valueOf(value);
+        }
+
+        @Override
+        public boolean begin() {
+            return false;
+        }
     });
 
     private String type;

+ 46 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -46,6 +46,7 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
     @Override
     public void start() {
         init();
+        run();
         scheduledTaskService.start(taskKey, cron, this);
         logger.info("启动定时任务:{} >> {}", taskKey, cron);
     }
@@ -101,12 +102,14 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
                 }
 
             }
+            // 更新记录点
+            point.refresh();
+
         }
 
-        // 更新记录点
-        Map<String, String> position = point.getPosition();
-        if (!CollectionUtils.isEmpty(position)) {
-            position.forEach((k, v) -> map.put(k, v));
+        // 持久化
+        if (point.refreshed()) {
+            point.getPosition().forEach((k, v) -> map.put(k, v));
             logger.info("增量点:{}", map);
         }
 
@@ -114,7 +117,7 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
     private Point checkLastPoint(Map<String, String> command, int index) {
         // 检查是否存在系统参数
-        String query = command.get(ConnectorConstant.OPERTION_QUERY);
+        final String query = command.get(ConnectorConstant.OPERTION_QUERY);
         List<QuartzFilterEnum> filterEnums = Stream.of(QuartzFilterEnum.values()).filter(f -> {
             Assert.isTrue(appearNotMoreThanOnce(query, f.getType()), String.format("系统参数%s存在多个.", f.getType()));
             return StringUtils.contains(query, f.getType());
@@ -125,28 +128,40 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
         Point point = new Point();
         // 存在系统参数,替换
+        String replaceQuery = query;
         for (QuartzFilterEnum quartzFilter : filterEnums) {
             final String type = quartzFilter.getType();
             final QuartzFilter f = quartzFilter.getQuartzFilter();
 
             // 替换字符
-            String replaceQuery = StringUtils.replace(query, "'" + type + "'", "?");
-            point.setCommand(ConnectorConstant.OPERTION_QUERY, replaceQuery);
+            replaceQuery = StringUtils.replace(replaceQuery, "'" + type + "'", "?");
 
-            // 设置参数
+            // 创建参数索引key
             final String key = index + type;
+
+            // 为空参数
             if (!map.containsKey(key)) {
                 final Object val = f.getObject();
+                final String valStr = f.toString(val);
                 point.addArg(val);
-                point.setKey(key, f.toString(val));
+                map.put(key, valStr);
                 continue;
             }
 
             // 读取历史增量点
-            Object val = f.getObject(map.get(key));
+            // 开始位置
+            if(f.begin()){
+                Object val = f.getObject(map.get(key));
+                point.addArg(val);
+                point.setBeginKey(key);
+                continue;
+            }
+            // 结束位置(刷新)
+            Object val = f.getObject();
             point.addArg(val);
-            point.setKey(key, f.toString(f.getObject()));
+            point.setBeginValue(f.toString(val));
         }
+        point.setCommand(ConnectorConstant.OPERTION_QUERY, replaceQuery);
 
         return point;
     }
@@ -186,6 +201,9 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         private Map<String, String> position;
         private Map<String, String> command;
         private List<Object> args;
+        private String beginKey;
+        private String beginValue;
+        private boolean refreshed;
 
         public Point() {
             this.position = new HashMap<>();
@@ -198,10 +216,6 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
             this.args = args;
         }
 
-        public void setKey(String key, String value) {
-            position.put(key, value);
-        }
-
         public void setCommand(String key, String value) {
             command.put(key, value);
         }
@@ -210,6 +224,15 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
             args.add(val);
         }
 
+        public void refresh() {
+            position.put(beginKey, beginValue);
+            refreshed = true;
+        }
+
+        public boolean refreshed() {
+            return refreshed;
+        }
+
         public Map<String, String> getPosition() {
             return position;
         }
@@ -219,9 +242,16 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         }
 
         public List<Object> getArgs() {
-            return args;
+            return new ArrayList<>(args);
         }
 
+        public void setBeginKey(String beginKey) {
+            this.beginKey = beginKey;
+        }
+
+        public void setBeginValue(String beginValue) {
+            this.beginValue = beginValue;
+        }
     }
 
 }