Browse Source

add picker

AE86 5 years ago
parent
commit
1a6cb65d03

+ 79 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -0,0 +1,79 @@
+package org.dbsyncer.manager.config;
+
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.TableGroup;
+import org.springframework.util.Assert;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class FieldPicker {
+
+    private TableGroup tableGroup;
+    private List<Field> column;
+    private List<FieldMapping> fieldMapping;
+    private List<Node> index;
+    private int indexSize;
+
+    public FieldPicker(TableGroup tableGroup, List<Field> column, List<FieldMapping> fieldMapping) {
+        this.tableGroup = tableGroup;
+        this.column = column;
+        this.fieldMapping = fieldMapping;
+        init();
+    }
+
+    public Map<String, Object> getColumns(List<Object> list) {
+        if (!CollectionUtils.isEmpty(list)) {
+            Map<String, Object> data = new HashMap<>(indexSize);
+            int size = list.size();
+            index.parallelStream().forEach(node -> {
+                if (node.i <= size) {
+                    data.put(node.name, list.get(node.i));
+                }
+            });
+            return data;
+        }
+        return Collections.EMPTY_MAP;
+    }
+
+    public TableGroup getTableGroup() {
+        return tableGroup;
+    }
+
+    private void init() {
+        // column  => [1, 86, 0, 中文, 2020-05-15T12:17:22.000+0800, 备注信息]
+        Assert.notEmpty(column, "读取字段不能为空.");
+        Assert.notEmpty(fieldMapping, "映射关系不能为空.");
+
+        // 找到同步字段 => [{source.name}]
+        Set<String> key = fieldMapping.stream().map(m -> m.getSource().getName()).collect(Collectors.toSet());
+
+        // 记录字段索引 [{"ID":0},{"NAME":1}]
+        index = new LinkedList<>();
+        int size = column.size();
+        String k = null;
+        for (int i = 0; i < size; i++) {
+            k = column.get(i).getName();
+            if (key.contains(k)) {
+                index.add(new Node(k, i));
+            }
+        }
+        Assert.notEmpty(index, "同步映射关系不能为空.");
+        this.indexSize = index.size();
+    }
+
+    final class Node {
+        // 属性
+        String name;
+        // 索引
+        int i;
+
+        public Node(String name, int i) {
+            this.name = name;
+            this.i = i;
+        }
+    }
+
+}

+ 25 - 6
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -1,24 +1,27 @@
 package org.dbsyncer.manager.puller.impl;
 
 import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.Meta;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * 增量同步
@@ -95,18 +98,34 @@ public class IncrementPuller extends AbstractPuller {
         private Mapping mapping;
         private List<TableGroup> list;
         private String metaId;
+        private Map<String, List<FieldPicker>> tablePicker;
 
         public DefaultListener(Mapping mapping, List<TableGroup> list) {
             this.mapping = mapping;
             this.list = list;
             this.metaId = mapping.getMetaId();
+            this.tablePicker = new LinkedHashMap<>();
+            list.forEach(t -> {
+                final Table table = t.getSourceTable();
+                final String tableName = table.getName();
+                tablePicker.putIfAbsent(tableName, new ArrayList<>());
+                tablePicker.get(tableName).add(new FieldPicker(t, table.getColumn(), t.getFieldMapping()));
+            });
         }
 
         @Override
         public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
-            logger.info("监听数据>tableName:{},event:{},before:{}, after:{}", tableName, event, before, after);
+            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
+
             // 处理过程有异常向上抛
-            list.forEach(tableGroup -> parser.execute(mapping, tableGroup));
+            List<FieldPicker> pickers = tablePicker.get(tableName);
+            if (!CollectionUtils.isEmpty(pickers)) {
+                pickers.parallelStream().forEach(p -> {
+                    DataEvent data = new DataEvent(event, p.getColumns(before), p.getColumns(after));
+                    parser.execute(mapping, p.getTableGroup(), data);
+                });
+            }
+
         }
 
         @Override

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -8,6 +8,7 @@ import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.DataEvent;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 
@@ -124,6 +125,7 @@ public interface Parser {
      *
      * @param mapping
      * @param tableGroup
+     * @param dataEvent
      */
-    void execute(Mapping mapping, TableGroup tableGroup);
+    void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent);
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -225,8 +225,8 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup) {
-
+    public void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent) {
+        logger.info("同步数据=> dataEvent:{}", dataEvent);
     }
 
     /**

+ 47 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/DataEvent.java

@@ -0,0 +1,47 @@
+package org.dbsyncer.parser.model;
+
+import org.dbsyncer.common.util.JsonUtil;
+
+import java.util.Map;
+
+public final class DataEvent {
+
+    private String event;
+    private Map<String, Object> before;
+    private Map<String, Object> after;
+
+    public DataEvent(String event, Map<String, Object> before, Map<String, Object> after) {
+        this.event = event;
+        this.before = before;
+        this.after = after;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+
+    public Map<String, Object> getBefore() {
+        return before;
+    }
+
+    public void setBefore(Map<String, Object> before) {
+        this.before = before;
+    }
+
+    public Map<String, Object> getAfter() {
+        return after;
+    }
+
+    public void setAfter(Map<String, Object> after) {
+        this.after = after;
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.objToJson(this);
+    }
+}