Browse Source

支持Dql连接器一对多表监听

Signed-off-by: AE86 <836391306@qq.com>
AE86 2 years ago
parent
commit
137e8550b5

+ 38 - 14
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -25,7 +25,11 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
 
-    private Map<String, DqlMapper> dqlMap = new ConcurrentHashMap<>();
+    /**
+     * 自定义SQL,支持1对多
+     * <p>MY_USER > [用户表1, 用户表2]
+     */
+    private Map<String, List<DqlMapper>> dqlMap = new ConcurrentHashMap<>();
 
     /**
      * 发送增量事件
@@ -42,19 +46,28 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      * @param event
      */
     protected void sendDqlChangedEvent(RowChangedEvent event) {
-        if (null != event) {
-            DqlMapper dqlMapper = dqlMap.get(event.getSourceTableName());
-            if (null != dqlMapper) {
+        if (null == event) {
+            return;
+        }
+        List<DqlMapper> dqlMappers = dqlMap.get(event.getSourceTableName());
+        if (CollectionUtils.isEmpty(dqlMappers)) {
+            return;
+        }
+
+        boolean processed = false;
+        for (DqlMapper dqlMapper : dqlMappers) {
+            if (!processed) {
                 switch (event.getEvent()) {
                     case ConnectorConstant.OPERTION_UPDATE:
                     case ConnectorConstant.OPERTION_INSERT:
-                        event.setDataList(queryDqlData(dqlMapper, event.getDataList()));
+                        queryDqlData(dqlMapper, event.getDataList());
                         break;
                     default:
                         break;
                 }
-                changedEvent(event);
+                processed = true;
             }
+            changedEvent(new RowChangedEvent(dqlMapper.sqlName, event.getEvent(), event.getDataList()));
         }
     }
 
@@ -66,9 +79,11 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
         AbstractDatabaseConnector connector = (AbstractDatabaseConnector) connectorFactory.getConnector(mapper);
         String quotation = connector.buildSqlWithQuotation();
 
+        // <用户表, MY_USER>
         Map<String, String> tableMap = new HashMap<>();
         mapper.getConfig().getSqlTables().forEach(s -> tableMap.put(s.getSqlName(), s.getTable()));
-
+        // 清空默认表名
+        filterTable.clear();
         for (Table t : sourceTable) {
             String sql = t.getSql();
             String sqlName = t.getName();
@@ -89,8 +104,13 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
             boolean notContainsWhere = !StringUtil.contains(sql, " WHERE ");
             querySql.append(notContainsWhere ? " WHERE " : " AND ");
             PrimaryKeyUtil.buildSql(querySql, primaryKeys, quotation, " AND ", " = ? ", notContainsWhere);
-            DqlMapper dqlMapper = new DqlMapper(mapper, querySql.toString(), column, getPrimaryKeyIndexArray(column, primaryKeys));
-            dqlMap.putIfAbsent(tableName, dqlMapper);
+            DqlMapper dqlMapper = new DqlMapper(mapper, sqlName, querySql.toString(), column, getPrimaryKeyIndexArray(column, primaryKeys));
+            if (!dqlMap.containsKey(tableName)) {
+                dqlMap.putIfAbsent(tableName, new ArrayList<>());
+            }
+            dqlMap.get(tableName).add(dqlMapper);
+            // 注册监听表名
+            filterTable.add(tableName);
         }
     }
 
@@ -109,10 +129,13 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
             }
         }
         Assert.isTrue(!CollectionUtils.isEmpty(indexList), "The primaryKeys is invalid.");
-        return (Integer[]) indexList.toArray();
+        Object[] indexArray = indexList.toArray();
+        Integer[] newIndexArray = new Integer[indexArray.length];
+        System.arraycopy(indexArray, 0, newIndexArray, 0, indexArray.length);
+        return newIndexArray;
     }
 
-    private List<Object> queryDqlData(DqlMapper dqlMapper, List<Object> data) {
+    private void queryDqlData(DqlMapper dqlMapper, List<Object> data) {
         if (!CollectionUtils.isEmpty(data)) {
             Map<String, Object> row = dqlMapper.mapper.execute(databaseTemplate -> {
                 int size = dqlMapper.primaryKeyIndexArray.length;
@@ -127,20 +150,21 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
                 dqlMapper.column.forEach(field -> data.add(row.get(field.getName())));
             }
         }
-        return data;
     }
 
     final class DqlMapper {
         DatabaseConnectorMapper mapper;
+        String sqlName;
         String sql;
         List<Field> column;
         Integer[] primaryKeyIndexArray;
 
-        public DqlMapper(DatabaseConnectorMapper mapper, String sql, List<Field> column, Integer[] primaryKeyIndexArray) {
+        public DqlMapper(DatabaseConnectorMapper mapper, String sqlName, String sql, List<Field> column, Integer[] primaryKeyIndexArray) {
             this.mapper = mapper;
+            this.sqlName = sqlName;
+            this.sql = sql;
             this.column = column;
             this.primaryKeyIndexArray = primaryKeyIndexArray;
-            this.sql = sql;
         }
     }
 

+ 4 - 10
dbsyncer-manager/src/main/java/org/dbsyncer/manager/model/FieldPicker.java

@@ -20,10 +20,9 @@ import java.util.stream.Collectors;
 public class FieldPicker {
 
     private TableGroup tableGroup;
-    private List<Field> pkList;
     private List<Node> index;
     private int indexSize;
-    private boolean filterSwitch;
+    private boolean enabledFilter;
     private List<Filter> add;
     private List<Filter> or;
 
@@ -31,9 +30,8 @@ public class FieldPicker {
         this.tableGroup = tableGroup;
     }
 
-    public FieldPicker(TableGroup tableGroup, List<Field> pkList, List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
+    public FieldPicker(TableGroup tableGroup, List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
         this.tableGroup = tableGroup;
-        this.pkList = pkList;
         init(filter, column, fieldMapping);
     }
 
@@ -58,7 +56,7 @@ public class FieldPicker {
      * @return
      */
     public boolean filter(Map<String, Object> row) {
-        if (!filterSwitch) {
+        if (!enabledFilter) {
             return true;
         }
         // where (id > 1 and id < 100) or (id = 100 or id =101)
@@ -124,7 +122,7 @@ public class FieldPicker {
         Assert.notEmpty(fieldMapping, "映射关系不能为空.");
 
         // 解析过滤条件
-        if ((filterSwitch = !CollectionUtils.isEmpty(filter))) {
+        if ((enabledFilter = !CollectionUtils.isEmpty(filter))) {
             add = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), OperationEnum.AND.getName())).collect(
                     Collectors.toList());
             or = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), OperationEnum.OR.getName())).collect(Collectors.toList());
@@ -148,10 +146,6 @@ public class FieldPicker {
         return tableGroup;
     }
 
-    public String getPk() {
-        return pkList.get(0).getName();
-    }
-
     final class Node {
         // 属性
         String name;

+ 1 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -298,10 +298,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             tableGroups.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
-                List<Field> pkList = t.getTargetTable().getColumn().stream().filter(field -> field.isPk()).collect(Collectors.toList());
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
                 TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
-                tablePicker.get(tableName).add(new FieldPicker(group, pkList, group.getFilter(), table.getColumn(), group.getFieldMapping()));
+                tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
             });
         }