1
0
穿云 3 сар өмнө
parent
commit
d133bf1f94

+ 0 - 9
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/DqlOracleListener.java

@@ -4,9 +4,6 @@
 package org.dbsyncer.connector.oracle.cdc;
 
 import org.dbsyncer.sdk.listener.ChangedEvent;
-import org.dbsyncer.sdk.model.Field;
-
-import java.util.List;
 
 /**
  * @Author AE86
@@ -26,10 +23,4 @@ public class DqlOracleListener extends OracleListener {
         super.sendDqlChangedEvent(event);
     }
 
-    @Override
-    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, List<String> primaryKeys) {
-        // ROW_ID
-        return new Integer[]{0};
-    }
-
 }

+ 4 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDQLConnector.java

@@ -85,4 +85,8 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         map.put(SqlBuilderEnum.QUERY_COUNT.getName(), queryCount.toString());
         return map;
     }
+
+    public MetaInfo getTableMetaInfo(DatabaseConnectorInstance connectorInstance, String tableNamePattern) {
+        return super.getMetaInfo(connectorInstance, tableNamePattern);
+    }
 }

+ 62 - 29
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractDatabaseListener.java

@@ -5,7 +5,7 @@ package org.dbsyncer.sdk.listener;
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.sdk.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.sdk.connector.database.AbstractDQLConnector;
 import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.listener.event.RowChangedEvent;
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * @author AE86
@@ -70,6 +71,9 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
                             return;
                         }
                         break;
+                    case ConnectorConstant.OPERTION_DELETE:
+                        getPKData(dqlMapper, changedEvent.getDataList());
+                        break;
                     default:
                         break;
                 }
@@ -85,7 +89,7 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
      */
     protected void postProcessDqlBeforeInitialization() {
         DatabaseConnectorInstance instance = (DatabaseConnectorInstance) connectorInstance;
-        AbstractDatabaseConnector service = (AbstractDatabaseConnector) connectorService;
+        AbstractDQLConnector service = (AbstractDQLConnector) connectorService;
         String quotation = service.buildSqlWithQuotation();
 
         // <用户表, MY_USER>
@@ -96,14 +100,24 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
         for (Table t : sourceTable) {
             String sql = t.getSql();
             String sqlName = t.getName();
-            List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(t);
             String tableName = tableMap.get(sqlName);
             Assert.hasText(sql, "The sql is null.");
             Assert.hasText(tableName, "The tableName is null.");
 
-            MetaInfo metaInfo = service.getMetaInfo(instance, sqlName);
-            final List<Field> column = metaInfo.getColumn();
-            Assert.notEmpty(column, String.format("The column of table name '%s' is empty.", sqlName));
+            MetaInfo tableMetaInfo = service.getTableMetaInfo(instance, tableName);
+            List<Field> tableColumns = tableMetaInfo.getColumn();
+            Assert.notEmpty(tableColumns, String.format("The column of table name '%s' is empty.", tableName));
+            List<Field> primaryFields = PrimaryKeyUtil.findPrimaryKeyFields(tableColumns);
+            Assert.notEmpty(primaryFields, String.format("主表 %s 缺少主键.", tableName));
+            List<String> primaryKeys = primaryFields.stream().map(Field::getName).collect(Collectors.toList());
+            Map<String, Integer> tablePKIndexMap = new HashMap<>(primaryKeys.size());
+            List<Integer> tablePKIndex = getPKIndex(tableColumns, tablePKIndexMap);
+
+            MetaInfo sqlMetaInfo = service.getMetaInfo(instance, sqlName);
+            final List<Field> sqlColumns = sqlMetaInfo.getColumn();
+            Assert.notEmpty(sqlColumns, String.format("The column of table name '%s' is empty.", sqlName));
+            Map<Integer, Integer> sqlPKIndexMap = getPKIndexMap(sqlColumns, tablePKIndexMap);
+            Assert.notEmpty(sqlPKIndexMap, String.format("表 %s 缺少主键.", sqlName));
 
             sql = sql.replace("\t", " ");
             sql = sql.replace("\r", " ");
@@ -114,7 +128,7 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
             boolean notContainsWhere = !StringUtil.contains(temp, " WHERE ");
             querySql.append(notContainsWhere ? " WHERE " : StringUtil.EMPTY);
             PrimaryKeyUtil.buildSql(querySql, primaryKeys, quotation, " AND ", " = ? ", notContainsWhere);
-            DqlMapper dqlMapper = new DqlMapper(instance, sqlName, querySql.toString(), column, getPrimaryKeyIndexArray(column, primaryKeys));
+            DqlMapper dqlMapper = new DqlMapper(instance, sqlName, querySql.toString(), sqlColumns, tablePKIndex, sqlPKIndexMap);
             if (!dqlMap.containsKey(tableName)) {
                 dqlMap.putIfAbsent(tableName, new ArrayList<>());
             }
@@ -124,34 +138,33 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
         }
     }
 
-    /**
-     * 获取主表主键索引
-     *
-     * @param column
-     * @param primaryKeys
-     * @return
-     */
-    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, List<String> primaryKeys) {
-        List<Integer> indexList = new ArrayList<>();
-        for (Field f : column) {
-            if (primaryKeys.contains(f.getName())) {
-                indexList.add(column.indexOf(f));
+    private Map<Integer, Integer> getPKIndexMap(List<Field> column, Map<String, Integer> tablePKIndexMap) {
+        Map<Integer, Integer> map = new HashMap<>();
+        for (int i = 0; i < column.size(); i++) {
+            final int index = i;
+            tablePKIndexMap.computeIfPresent(column.get(i).getName(), (k, v) -> map.put(index, v));
+        }
+        return map;
+    }
+
+    private List<Integer> getPKIndex(List<Field> column, Map<String, Integer> tablePKIndexMap) {
+        List<Integer> list = new ArrayList<>();
+        for (int i = 0; i < column.size(); i++) {
+            if (column.get(i).isPk()) {
+                list.add(i);
+                tablePKIndexMap.put(column.get(i).getName(), i);
             }
         }
-        Assert.isTrue(!CollectionUtils.isEmpty(indexList), "The primaryKeys is invalid.");
-        Object[] indexArray = indexList.toArray();
-        Integer[] newIndexArray = new Integer[indexArray.length];
-        System.arraycopy(indexArray, 0, newIndexArray, 0, indexArray.length);
-        return newIndexArray;
+        return list;
     }
 
     private void queryDqlData(DqlMapper dqlMapper, List<Object> data) {
         if (!CollectionUtils.isEmpty(data)) {
             Map<String, Object> row = dqlMapper.instance.execute(databaseTemplate -> {
-                int size = dqlMapper.primaryKeyIndexArray.length;
+                int size = dqlMapper.tablePKIndex.size();
                 Object[] args = new Object[size];
                 for (int i = 0; i < size; i++) {
-                    args[i] = data.get(dqlMapper.primaryKeyIndexArray[i]);
+                    args[i] = data.get(dqlMapper.tablePKIndex.get(i));
                 }
                 return databaseTemplate.queryForMap(dqlMapper.sql, args);
             });
@@ -162,19 +175,39 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
         }
     }
 
+    private void getPKData(DqlMapper dqlMapper, List<Object> data) {
+        if (!CollectionUtils.isEmpty(data)) {
+            int size = dqlMapper.column.size();
+            List<Object> row = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                if (dqlMapper.sqlPKIndexMap.containsKey(i)) {
+                    row.add(data.get(dqlMapper.sqlPKIndexMap.get(i)));
+                    continue;
+                }
+                row.add(null);
+            }
+            if (!CollectionUtils.isEmpty(row)) {
+                data.clear();
+                data.addAll(row);
+            }
+        }
+    }
+
     final class DqlMapper {
         DatabaseConnectorInstance instance;
         String sqlName;
         String sql;
         List<Field> column;
-        Integer[] primaryKeyIndexArray;
+        List<Integer> tablePKIndex;
+        Map<Integer, Integer> sqlPKIndexMap;
 
-        public DqlMapper(DatabaseConnectorInstance instance, String sqlName, String sql, List<Field> column, Integer[] primaryKeyIndexArray) {
+        public DqlMapper(DatabaseConnectorInstance instance, String sqlName, String sql, List<Field> column, List<Integer> tablePKIndex, Map<Integer, Integer> sqlPKIndexMap) {
             this.instance = instance;
             this.sqlName = sqlName;
             this.sql = sql;
             this.column = column;
-            this.primaryKeyIndexArray = primaryKeyIndexArray;
+            this.tablePKIndex = tablePKIndex;
+            this.sqlPKIndexMap = sqlPKIndexMap;
         }
     }
 

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/util/PrimaryKeyUtil.java

@@ -37,7 +37,7 @@ public abstract class PrimaryKeyUtil {
         }
 
         // 获取表同步的主键字段
-        List<String> primaryKeys = findPrimaryKeyFields(table.getColumn()).stream().map(f -> f.getName()).collect(Collectors.toList());
+        List<String> primaryKeys = findPrimaryKeyFields(table.getColumn()).stream().map(Field::getName).collect(Collectors.toList());
 
         // 如果存在表字段映射关系,没有配置主键则抛出异常提示
         if (!CollectionUtils.isEmpty(table.getColumn()) && CollectionUtils.isEmpty(primaryKeys)) {