ソースを参照

统一主键获取方式

Signed-off-by: AE86 <836391306@qq.com>
AE86 2 年 前
コミット
4861d68242

+ 10 - 3
dbsyncer-common/src/main/java/org/dbsyncer/common/model/NotifyMessage.java

@@ -24,27 +24,34 @@ public class NotifyMessage {
      */
     private String receiver;
 
+    public static NotifyMessage newBuilder() {
+        return new NotifyMessage();
+    }
+
     public String getTitle() {
         return title;
     }
 
-    public void setTitle(String title) {
+    public NotifyMessage setTitle(String title) {
         this.title = title;
+        return this;
     }
 
     public String getContent() {
         return content;
     }
 
-    public void setContent(String content) {
+    public NotifyMessage setContent(String content) {
         this.content = content;
+        return this;
     }
 
     public String getReceiver() {
         return receiver;
     }
 
-    public void setReceiver(String receiver) {
+    public NotifyMessage setReceiver(String receiver) {
         this.receiver = receiver;
+        return this;
     }
 }

+ 4 - 32
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -18,6 +18,7 @@ import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.DatabaseUtil;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.support.rowset.ResultSetWrappingSqlRowSet;
@@ -212,7 +213,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         // 获取查询数据行是否存在
         String tableName = commandConfig.getTable().getName();
-        String pk = findOriginalTablePrimaryKey(commandConfig);
+        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getTable());
         StringBuilder queryCount = new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(tableName).append(quotation)
                 .append(" WHERE ").append(quotation).append(pk).append(quotation).append(" = ?");
         String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
@@ -267,35 +268,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return config.getSchema();
     }
 
-    /**
-     * 返回主键名称
-     *
-     * @param commandConfig
-     * @return
-     */
-    protected String findOriginalTablePrimaryKey(CommandConfig commandConfig) {
-        // 获取自定义主键
-        String pk = commandConfig.getTable().getPrimaryKey();
-        if (StringUtil.isNotBlank(pk)) {
-            return pk;
-        }
-
-        // 获取表原始主键
-        Table table = commandConfig.getOriginalTable();
-        if (null != table) {
-            List<Field> column = table.getColumn();
-            if (!CollectionUtils.isEmpty(column)) {
-                for (Field c : column) {
-                    if (c.isPk()) {
-                        return c.getName();
-                    }
-                }
-            }
-        }
-
-        throw new ConnectorException(String.format("The primary key of table '%s' is null.", commandConfig.getTable().getName()));
-    }
-
     /**
      * 获取表列表
      *
@@ -384,7 +356,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      */
     protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
         String table = commandConfig.getTable().getName();
-        String pk = findOriginalTablePrimaryKey(commandConfig);
+        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getTable());
         StringBuilder queryCount = new StringBuilder();
         queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
         if (StringUtil.isNotBlank(queryFilterSql)) {
@@ -510,7 +482,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             throw new ConnectorException("Table name can not be empty.");
         }
         if (StringUtil.isBlank(pk)) {
-            pk = findOriginalTablePrimaryKey(commandConfig);
+            pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(table);
         }
 
         SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());

+ 43 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -0,0 +1,43 @@
+package org.dbsyncer.connector.util;
+
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.Table;
+
+import java.util.List;
+
+public abstract class PrimaryKeyUtil {
+
+    /**
+     * 返回主键名称
+     *
+     * @param table
+     * @return
+     */
+    public static String findOriginalTablePrimaryKey(Table table) {
+        if (null == table) {
+            return null;
+        }
+
+        // 获取自定义主键
+        String pk = table.getPrimaryKey();
+        if (StringUtil.isNotBlank(pk)) {
+            return pk;
+        }
+
+        // 获取表原始主键
+        List<Field> column = table.getColumn();
+        if (!CollectionUtils.isEmpty(column)) {
+            for (Field c : column) {
+                if (c.isPk()) {
+                    return c.getName();
+                }
+            }
+        }
+
+        throw new ConnectorException(String.format("The primary key of table '%s' is null.", table.getName()));
+    }
+
+}

+ 14 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -9,6 +9,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
@@ -22,7 +23,10 @@ import org.dbsyncer.manager.model.FieldPicker;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +39,13 @@ import javax.annotation.PostConstruct;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -143,8 +153,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         if (ListenerTypeEnum.isTiming(listenerType)) {
             AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             extractor.setCommands(list.stream().map(t -> {
-                Picker picker = new Picker(t.getFieldMapping());
-                return new TableGroupCommand(picker.getSourcePrimaryKeyName(t.getSourceTable()), t.getCommand());
+                String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
+                return new TableGroupCommand(pk, t.getCommand());
             }).collect(Collectors.toList()));
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
             return extractor;

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -22,6 +22,7 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.event.FullRefreshEvent;
@@ -247,7 +248,7 @@ public class ParserFactory implements Parser {
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
-        String pk = picker.getSourcePrimaryKeyName(tableGroup.getSourceTable());
+        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
 
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();

+ 0 - 14
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -2,8 +2,6 @@ package org.dbsyncer.parser.model;
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.model.Field;
-import org.dbsyncer.connector.model.Table;
-import org.springframework.util.Assert;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -57,18 +55,6 @@ public class Picker {
         }
     }
 
-    public String getSourcePrimaryKeyName(Table sourceTable) {
-        for (Field f : sourceFields) {
-            if (null != f && f.isPk()) {
-                return f.getName();
-            }
-        }
-
-        String primaryKey = sourceTable.getPrimaryKey();
-        Assert.hasText(primaryKey, "主键为空");
-        return primaryKey;
-    }
-
     public List<Field> getSourceFields() {
         return sourceFields.stream().filter(f -> null != f).collect(Collectors.toList());
     }