AE86 5 年 前
コミット
91eee9174c

+ 8 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -62,6 +62,9 @@ public class TableGroupChecker extends AbstractChecker {
         tableGroup.setSourceTable(getTable(mapping.getSourceConnectorId(), sourceTable));
         tableGroup.setTargetTable(getTable(mapping.getTargetConnectorId(), targetTable));
 
+        // 生成command
+        setCommand(mapping, tableGroup);
+
         // 修改基本配置
         this.modifyConfigModel(tableGroup, params);
         return tableGroup;
@@ -150,4 +153,9 @@ public class TableGroupChecker extends AbstractChecker {
         col.forEach(f -> map.put(f.getName(), f));
         return map;
     }
+
+    private void setCommand(Mapping mapping, TableGroup tableGroup) {
+        Map<String, String> command = manager.getCommand(mapping.getSourceConnectorId(), mapping.getTargetConnectorId(), tableGroup);
+        tableGroup.setCommand(command);
+    }
 }

+ 19 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -2,8 +2,10 @@ package org.dbsyncer.connector;
 
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
+import org.dbsyncer.connector.template.CommandTemplate;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * 连接器基础功能
@@ -38,4 +40,21 @@ public interface Connector {
      * @return
      */
     MetaInfo getMetaInfo(ConnectorConfig config, String tableName);
+
+    /**
+     * 获取数据源同步参数
+     *
+     * @param commandTemplate
+     * @return
+     */
+    Map<String, String> getSourceCommand(CommandTemplate commandTemplate);
+
+    /**
+     * 获取目标源同步参数
+     *
+     * @param commandTemplate
+     * @return
+     */
+    Map<String, String> getTargetCommand(CommandTemplate commandTemplate);
+
 }

+ 22 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -3,9 +3,12 @@ package org.dbsyncer.connector;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.connector.template.CommandTemplate;
 import org.springframework.util.Assert;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * 连接器工厂
@@ -51,6 +54,24 @@ public class ConnectorFactory {
         return getConnector(type).getMetaInfo(config, tableName);
     }
 
+    /**
+     * 获取连接器同步参数
+     *
+     * @param sourceCommandTemplate
+     * @param targetCommandTemplate
+     * @return
+     */
+    public Map<String, String> getCommand(CommandTemplate sourceCommandTemplate, CommandTemplate targetCommandTemplate) {
+        String sType = sourceCommandTemplate.getType();
+        String tType = targetCommandTemplate.getType();
+        Map<String, String> map = new HashMap<>();
+        Map<String, String> sCmd = getConnector(sType).getSourceCommand(sourceCommandTemplate);
+        Map<String, String> tCmd = getConnector(tType).getTargetCommand(targetCommandTemplate);
+        map.putAll(sCmd);
+        map.putAll(tCmd);
+        return map;
+    }
+
     /**
      * 获取连接器
      *
@@ -62,4 +83,5 @@ public class ConnectorFactory {
         Assert.hasText(connectorType, "ConnectorType can not be empty.");
         return ConnectorEnum.getConnector(connectorType);
     }
+
 }

+ 6 - 30
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java

@@ -27,6 +27,12 @@ public class ConnectorConstant {
      */
     public static final String OPERTION_QUERY = "QUERY";
 
+    /**
+     * 查询最近记录点
+     * <p>例如:SELECT MAX(MY_TEST.LAST_TIME) FROM MY_TEST</p>
+     */
+    public static final String OPERTION_QUERY_MAX = "QUERY_MAX";
+
     /**
      * 查询表达式and
      */
@@ -37,34 +43,4 @@ public class ConnectorConstant {
      */
     public static final String OPERTION_QUERY_OR = "or";
 
-    /**
-     * 日志_增量查询
-     * <p>例如:WHERE USER.ID = ?</p>
-     */
-    public static final String OPERTION_QUERY_LOG = "QUERY_LOG";
-
-    /**
-     * 定时器_增量查询
-     * <p>例如:SELECT ASD_TEST.* FROM ASD_TEST</p>
-     */
-    public static final String OPERTION_QUERY_QUARTZ = "QUERY_QUARTZ";
-
-    /**
-     * 定时器_增量查询条件:增量字段 < 增量时间 <= 增量字段
-     * <p>T1.LASTDATE > '2017-11-10 11:07:41' AND T1.LASTDATE <= '2017-11-10 11:30:01' ORDER BY LASTDATE</p>
-     */
-    public static final String OPERTION_QUERY_QUARTZ_RANGE = "QUERY_QUARTZ_RANGE";
-
-    /**
-     * 定时器_增量查询条件:增量字段<= 增量时间
-     * <p>T1.LASTDATE <= '2017-11-10 11:07:41' ORDER BY T1.LASTDATE</p>
-     */
-    public static final String OPERTION_QUERY_QUARTZ_ALL = "QUERY_QUARTZ_ALL";
-
-    /**
-     * 定时器_增量查询最后记录点
-     * <p>例如:SELECT MAX(T1.LASTDATE) FROM ASD_TEST T1</p>
-     */
-    public static final String OPERTION_QUERY_QUARTZ_MAX = "QUERY_QUARTZ_MAX";
-
 }

+ 11 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -2,6 +2,7 @@ package org.dbsyncer.connector.database;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.template.CommandTemplate;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -85,6 +86,16 @@ public abstract class AbstractDatabaseConnector implements Database {
         return metaInfo;
     }
 
+    @Override
+    public Map<String, String> getSourceCommand(CommandTemplate commandTemplate) {
+        return null;
+    }
+
+    @Override
+    public Map<String, String> getTargetCommand(CommandTemplate commandTemplate) {
+        return null;
+    }
+
     @Override
     public JdbcTemplate getJdbcTemplate(DatabaseConfig config) {
         return DatabaseUtil.getJdbcTemplate(config);

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryQuartz.java → dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryMax.java

@@ -14,7 +14,7 @@ import java.util.List;
  * @version 1.0.0
  * @date 2019/9/27 0:20
  */
-public class SqlBuilderQueryQuartz implements SqlBuilder {
+public class SqlBuilderQueryMax implements SqlBuilder {
 
     @Override
     public String buildSql(DatabaseConfig config, String tableName, String pk, List<String> filedNames, String queryFilter,

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SqlBuilderEnum.java

@@ -30,9 +30,9 @@ public enum SqlBuilderEnum {
      */
     QUERY(ConnectorConstant.OPERTION_QUERY, new SqlBuilderQuery()),
     /**
-     * 查询定时SQL生成器
+     * 查询SQL最新记录点
      */
-    QUERY_QUARTZ(ConnectorConstant.OPERTION_QUERY_QUARTZ, new SqlBuilderQueryQuartz());
+    QUERY_MAX(ConnectorConstant.OPERTION_QUERY_MAX, new SqlBuilderQueryMax());
 
     // SQL构造器名称
     private String name;

+ 11 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ldap/LdapConnector.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.connector.ldap;
 
 import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.connector.template.CommandTemplate;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.LdapConfig;
 import org.dbsyncer.connector.config.MetaInfo;
@@ -43,6 +44,16 @@ public final class LdapConnector implements Ldap {
 		return null;
 	}
 
+	@Override
+	public Map<String, String> getSourceCommand(CommandTemplate commandTemplate) {
+		return null;
+	}
+
+	@Override
+	public Map<String, String> getTargetCommand(CommandTemplate commandTemplate) {
+		return null;
+	}
+
 	@Override
 	public LdapTemplate getLdapTemplate(LdapConfig config)
 			throws AuthenticationException, CommunicationException, javax.naming.NamingException {

+ 12 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisConnector.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.connector.redis;
 
 import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.connector.template.CommandTemplate;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.RedisConfig;
@@ -12,6 +13,7 @@ import redis.clients.jedis.JedisPool;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public final class RedisConnector implements Redis {
 
@@ -47,6 +49,16 @@ public final class RedisConnector implements Redis {
         return null;
     }
 
+    @Override
+    public Map<String, String> getSourceCommand(CommandTemplate commandTemplate) {
+        return null;
+    }
+
+    @Override
+    public Map<String, String> getTargetCommand(CommandTemplate commandTemplate) {
+        return null;
+    }
+
     @Override
     public RedisTemplate getRedisTemplate(RedisConfig config) {
         return this.getRedisTemplate(config, null, null, null);

+ 46 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/template/CommandTemplate.java

@@ -0,0 +1,46 @@
+package org.dbsyncer.connector.template;
+
+import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.config.Table;
+
+import java.util.List;
+
+/**
+ * 查询模板
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/16 23:59
+ */
+public class CommandTemplate {
+
+    private String type;
+
+    private Table table;
+
+    private List<Filter> filter;
+
+    public CommandTemplate(String type, Table table) {
+        this.type = type;
+        this.table = table;
+    }
+
+    public CommandTemplate(String type, Table table, List<Filter> filter) {
+        this.type = type;
+        this.table = table;
+        this.filter = filter;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Table getTable() {
+        return table;
+    }
+
+    public List<Filter> getFilter() {
+        return filter;
+    }
+
+}

+ 3 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -13,6 +13,7 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.plugin.config.Plugin;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -60,6 +61,8 @@ public interface Manager {
 
     List<TableGroup> getTableGroupAll(String mappingId);
 
+    Map<String, String> getCommand(String sourceConnectorId, String targetConnectorId, TableGroup tableGroup);
+
     // ConnectorEnum
     List<ConnectorEnum> getConnectorEnumAll();
 

+ 6 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -25,6 +25,7 @@ import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -282,6 +283,11 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
         });
     }
 
+    @Override
+    public Map<String, String> getCommand(String sourceConnectorId, String targetConnectorId, TableGroup tableGroup) {
+        return parser.getCommand(sourceConnectorId, targetConnectorId, tableGroup);
+    }
+
     @Override
     public List<ConnectorEnum> getConnectorEnumAll() {
         return parser.getConnectorEnumAll();

+ 12 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -11,6 +11,7 @@ import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -44,6 +45,16 @@ public interface Parser {
      */
     MetaInfo getMetaInfo(String connectorId, String tableName);
 
+    /**
+     * 获取映射关系执行命令
+     *
+     * @param sourceConnectorId
+     * @param targetConnectorId
+     * @param tableGroup
+     * @return
+     */
+    Map<String, String> getCommand(String sourceConnectorId, String targetConnectorId, TableGroup tableGroup);
+
     /**
      * 解析连接器配置为Connector
      *
@@ -95,4 +106,5 @@ public interface Parser {
      * @return
      */
     List<ConvertEnum> getConvertEnumAll();
+
 }

+ 18 - 26
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,16 +1,17 @@
 package org.dbsyncer.parser;
 
-import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.connector.template.CommandTemplate;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.connector.config.Filter;
 import org.dbsyncer.connector.config.MetaInfo;
+import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
@@ -25,6 +26,7 @@ import org.springframework.util.Assert;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -61,6 +63,20 @@ public class ParserFactory implements Parser {
         return connectorFactory.getMetaInfo(config, tableName);
     }
 
+    @Override
+    public Map<String, String> getCommand(String sourceConnectorId, String targetConnectorId, TableGroup tableGroup) {
+        String sType = getConnectorConfig(sourceConnectorId).getConnectorType();
+        String tType = getConnectorConfig(targetConnectorId).getConnectorType();
+        Table sourceTable = tableGroup.getSourceTable();
+        Table targetTable = tableGroup.getTargetTable();
+        List<Filter> filter = tableGroup.getFilter();
+        final CommandTemplate sourceCmdTemplate = new CommandTemplate(sType, sourceTable, filter);
+        final CommandTemplate targetCmdTemplate = new CommandTemplate(tType, targetTable);
+        // 获取连接器同步参数
+        Map<String, String> command = connectorFactory.getCommand(sourceCmdTemplate, targetCmdTemplate);
+        return command;
+    }
+
     @Override
     public Connector parseConnector(String json) {
         try {
@@ -125,18 +141,6 @@ public class ParserFactory implements Parser {
         return Arrays.asList(ConvertEnum.values());
     }
 
-    /**
-     * 验证连接器是否可用
-     *
-     * @param connectorId
-     */
-    private void aliveConnector(String connectorId) {
-        ConnectorConfig config = getConnectorConfig(connectorId);
-        if (!connectorFactory.isAlive(config)) {
-            throw new ParserException("无法连接,请检查服务是否正常.");
-        }
-    }
-
     /**
      * 获取连接配置
      *
@@ -150,16 +154,4 @@ public class ParserFactory implements Parser {
         return conn.getConfig();
     }
 
-    private void setConfigModel(ConfigModel model, String type) {
-        Assert.notNull(model, "ConfigModel can not be null.");
-        Assert.hasText(type, "ConfigModel type can not be empty.");
-        Assert.hasText(model.getName(), "ConfigModel name can not be empty.");
-
-        model.setId(StringUtils.isEmpty(model.getId()) ? String.valueOf(snowflakeIdWorker.nextId()) : model.getId());
-        model.setType(type);
-        long now = System.currentTimeMillis();
-        model.setCreateTime(null == model.getCreateTime() ? now : model.getCreateTime());
-        model.setUpdateTime(now);
-    }
-
 }

+ 4 - 6
dbsyncer-parser/src/main/resources/TableGroup.json

@@ -1,9 +1,9 @@
 {
   "id":"tableGroupId_0",
   "type":"tableGroup",
-  "name": "",
-  "createDateTime": "2019-10-08 21:32:00",
-  "updateDateTime": "2019-10-08 21:35:00",
+  "name": "tableGroup",
+  "createTime": "2019-10-08 21:32:00",
+  "updateTime": "2019-10-08 21:35:00",
   "mappingId": "11111",
   "sourceTable":{
     "name":"MY_USER",
@@ -48,9 +48,7 @@
     "INSERT":"INSERT INTO MY_USER(id,name)values(?,?)",
     "UPDATE":"UPDATE MY_USER set name=? where MY_USER.id=?",
     "DELETE":"DELETE MY_USER WHERE MY_USER.id=?",
-    "QUARTZ_RANGE":"select xxx",
-    "QUARTZ_ALL":"select xxx",
-    "QUARTZ_MAX":"select xxx"
+    "QUERY_MAX":"select xxx"
   },
   "filter": [
     {