瀏覽代碼

add filter

AE86 5 年之前
父節點
當前提交
59cef96624

+ 7 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/CompareFilter.java

@@ -0,0 +1,7 @@
+package org.dbsyncer.connector;
+
+public interface CompareFilter {
+
+    boolean compare(String value, String filterValue);
+
+}

+ 35 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java

@@ -1,5 +1,10 @@
 package org.dbsyncer.connector.enums;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.dbsyncer.connector.CompareFilter;
+import org.dbsyncer.connector.ConnectorException;
+
 /**
  * 运算符表达式类型
  *
@@ -12,37 +17,60 @@ public enum FilterEnum {
     /**
      * 等于
      */
-    EQUAL("="),
+    EQUAL("=", (value, filterValue) -> StringUtils.equals(value, filterValue)),
     /**
      * 不等于
      */
-    NOT_EQUAL("!="),
+    NOT_EQUAL("!=", (value, filterValue) -> !StringUtils.equals(value, filterValue)),
     /**
      * 大于
      */
-    GT(">"),
+    GT(">", (value, filterValue) -> NumberUtils.toInt(value) > NumberUtils.toInt(filterValue)),
     /**
      * 小于
      */
-    LT("<"),
+    LT("<", (value, filterValue) -> NumberUtils.toInt(value) < NumberUtils.toInt(filterValue)),
     /**
      * 大于等于
      */
-    GT_AND_EQUAL(">="),
+    GT_AND_EQUAL(">=", (value, filterValue) -> NumberUtils.toInt(value) >= NumberUtils.toInt(filterValue)),
     /**
      * 小于等于
      */
-    LT_AND_EQUAL("<=");
+    LT_AND_EQUAL("<=", (value, filterValue) -> NumberUtils.toInt(value) <= NumberUtils.toInt(filterValue));
 
     // 运算符名称
     private String name;
+    // 比较器
+    private CompareFilter compareFilter;
 
-    FilterEnum(String name) {
+    FilterEnum(String name, CompareFilter compareFilter) {
         this.name = name;
+        this.compareFilter = compareFilter;
+    }
+
+    /**
+     * 获取比较器
+     *
+     * @param filterName
+     * @return
+     * @throws ConnectorException
+     */
+    public static CompareFilter getCompareFilter(String filterName) throws ConnectorException {
+        for (FilterEnum e : FilterEnum.values()) {
+            if (StringUtils.equals(filterName, e.getName())) {
+                return e.getCompareFilter();
+            }
+        }
+        throw new ConnectorException(String.format("FilterEnum name \"%s\" does not exist.", filterName));
     }
 
     public String getName() {
         return name;
     }
 
+    public CompareFilter getCompareFilter() {
+        return compareFilter;
+    }
+
 }

+ 0 - 29
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/TableCommandConfig.java

@@ -1,29 +0,0 @@
-package org.dbsyncer.listener.config;
-
-import java.util.Map;
-
-/**
- * @version 1.0.0
- * @Author AE86
- * @Date 2020-05-25 23:52
- */
-public class TableCommandConfig {
-
-    private String table;
-
-    private Map<String, String> command;
-
-    public TableCommandConfig(String table, Map<String, String> command) {
-        this.table = table;
-        this.command = command;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
-
-}

+ 2 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ExtractorConfig.java → dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/ExtractorConfig.java

@@ -1,7 +1,8 @@
-package org.dbsyncer.listener.config;
+package org.dbsyncer.manager.config;
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
 
 import java.util.Map;
 

+ 44 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -1,8 +1,12 @@
 package org.dbsyncer.manager.config;
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.CompareFilter;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.model.DataEvent;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.TableGroup;
@@ -16,6 +20,9 @@ public class FieldPicker {
     private TableGroup tableGroup;
     private List<Node> index;
     private int indexSize;
+    private boolean filterSwitch;
+    private List<Filter> add;
+    private List<Filter> or;
 
     public FieldPicker(TableGroup tableGroup, List<Filter> filter) {
         this.tableGroup = tableGroup;
@@ -51,12 +58,46 @@ public class FieldPicker {
      * @return
      */
     public boolean filter(DataEvent data) {
-        // TODO 过滤
-        //Map<String, Object> row = data.getData();
-        return false;
+        if (!filterSwitch) {
+            return true;
+        }
+        final Map<String, Object> row = data.getData();
+        // where (id > 1 and id < 100) or (id = 100 or id =101)
+        // 或 关系(成立任意条件)
+        CompareFilter filter = null;
+        Object value = null;
+        for (Filter f: or) {
+            value = row.get(f.getName());
+            if(null == value){
+                continue;
+            }
+            filter = FilterEnum.getCompareFilter(f.getFilter());
+            if(filter.compare(String.valueOf(value), f.getValue())){
+                return true;
+            }
+        }
+        // 并 关系(成立所有条件)
+        for (Filter f: add) {
+            value = row.get(f.getName());
+            if(null == value){
+                continue;
+            }
+            filter = FilterEnum.getCompareFilter(f.getFilter());
+            if(!filter.compare(String.valueOf(value), f.getValue())){
+                return false;
+            }
+        }
+
+        return true;
     }
 
     private void init(List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
+        // 解析过滤条件
+        if ((filterSwitch == !CollectionUtils.isEmpty(filter))) {
+            add = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.AND.getName())).collect(Collectors.toList());
+            or = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.OR.getName())).collect(Collectors.toList());
+        }
+
         // column  => [1, 86, 0, 中文, 2020-05-15T12:17:22.000+0800, 备注信息]
         Assert.notEmpty(column, "读取字段不能为空.");
         Assert.notEmpty(fieldMapping, "映射关系不能为空.");

+ 3 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -8,13 +8,13 @@ import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Listener;
-import org.dbsyncer.listener.config.ExtractorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.config.ExtractorConfig;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
@@ -237,7 +237,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
             // 处理过程有异常向上抛
             DataEvent data = new DataEvent(event, before, after);
-            if(picker.filter(data)){
+            if (picker.filter(data)) {
                 parser.execute(mapping, picker.getTableGroup(), data);
             }
 
@@ -291,7 +291,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             if (!CollectionUtils.isEmpty(pickers)) {
                 pickers.parallelStream().forEach(picker -> {
                     DataEvent data = new DataEvent(event, picker.getColumns(before), picker.getColumns(after));
-                    if(picker.filter(data)){
+                    if (picker.filter(data)) {
                         parser.execute(mapping, picker.getTableGroup(), data);
                     }
                 });