Procházet zdrojové kódy

简化mapper实现 & add todo

AE86 před 1 rokem
rodič
revize
f175836d5e

+ 0 - 51
dbsyncer-common/src/main/java/org/dbsyncer/common/model/DDLConvertContext.java

@@ -1,51 +0,0 @@
-package org.dbsyncer.common.model;
-
-import org.dbsyncer.common.spi.ConnectorMapper;
-
-public class DDLConvertContext extends AbstractConvertContext {
-
-    String sourceSql;
-
-    String targetSql;
-
-    String tableName;
-
-    String originFiledName;
-
-    String targetFileName;
-
-    String originType;
-
-    String targetType;
-
-    public DDLConvertContext(ConnectorMapper sourceConnectorMapper, ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,String sourceSql) {
-        super.init(sourceConnectorMapper, targetConnectorMapper, sourceTableName, targetTableName, event, null, null);
-        this.sourceSql = sourceSql;
-    }
-
-    public String getSourceSql() {
-        return sourceSql;
-    }
-
-    public void setSourceSql(String sourceSql) {
-        this.sourceSql = sourceSql;
-    }
-
-    public String getTargetSql() {
-        return targetSql;
-    }
-
-    public void setTargetSql(String targetSql) {
-        this.targetSql = targetSql;
-    }
-
-    /**
-     * 从源sql转化为目标源sql
-     */
-    public void convertSql(){
-        //TODO 从源sql转化为目标源sql
-        //获取目标源数据库类型
-
-        setTargetSql(sourceSql);
-    }
-}

+ 11 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -96,6 +96,17 @@ public interface Connector<M, C> {
      */
     Result writer(M connectorMapper, WriterBatchConfig config);
 
+    /**
+     * 执行DDL命令
+     *
+     * @param connectorMapper
+     * @param ddlConfig
+     * @return
+     */
+    default Result writerDDL(M connectorMapper, DDLConfig ddlConfig){
+        throw new ConnectorException("Unsupported method.");
+    }
+
     /**
      * 获取数据源同步参数
      *
@@ -111,6 +122,4 @@ public interface Connector<M, C> {
      * @return
      */
     Map<String, String> getTargetCommand(CommandConfig commandConfig);
-
-    Result writerDDL(M connectorMapper, DDLConfig ddlConfig);
 }

+ 7 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -172,6 +172,13 @@ public class ConnectorFactory implements DisposableBean {
         return result;
     }
 
+    public Result writerDDL(ConnectorMapper connectorMapper, DDLConfig ddlConfig) {
+        Connector connector = getConnector(connectorMapper);
+        Result result = connector.writerDDL(connectorMapper, ddlConfig);
+        Assert.notNull(result, "Connector writer batch result can not null");
+        return result;
+    }
+
     public Connector getConnector(ConnectorMapper connectorMapper) {
         return getConnector(connectorMapper.getConnectorType());
     }
@@ -195,11 +202,4 @@ public class ConnectorFactory implements DisposableBean {
         Assert.notNull(connectorMapper, "ConnectorMapper can not be null.");
         getConnector(connectorMapper).disconnect(connectorMapper);
     }
-
-    public Result executeSql(ConnectorMapper connectorMapper, DDLConfig ddlConfig) {
-        Connector connector = getConnector(connectorMapper);
-        Result result = connector.writerDDL(connectorMapper, ddlConfig);
-        Assert.notNull(result, "Connector writer batch result can not null");
-        return result;
-    }
 }

+ 8 - 32
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DDLConfig.java

@@ -1,40 +1,16 @@
 package org.dbsyncer.connector.config;
 
 public class DDLConfig {
+    /**
+     * 执行命令
+     */
+    private String sql;
 
-    String tableName;
-
-    String event;
-
-    String targetSql;
-
-    public DDLConfig(String tableName, String event, String targetSql) {
-        this.tableName = tableName;
-        this.event = event;
-        this.targetSql = targetSql;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public String getEvent() {
-        return event;
-    }
-
-    public void setEvent(String event) {
-        this.event = event;
-    }
-
-    public String getTargetSql() {
-        return targetSql;
+    public String getSql() {
+        return sql;
     }
 
-    public void setTargetSql(String targetSql) {
-        this.targetSql = targetSql;
+    public void setSql(String sql) {
+        this.sql = sql;
     }
 }

+ 9 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -642,12 +642,16 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     @Override
     public Result writerDDL(DatabaseConnectorMapper connectorMapper, DDLConfig config) {
-        String event = config.getEvent();
-        // 1、获取SQL
-        String executeSql = config.getTargetSql();
-        Assert.hasText(executeSql, "执行SQL语句不能为空.");
         Result result = new Result();
-        connectorMapper.executeDDL(executeSql);
+        try {
+            Assert.hasText(config.getSql(), "执行SQL语句不能为空.");
+            connectorMapper.execute(databaseTemplate -> {
+                databaseTemplate.execute(config.getSql());
+                return true;
+            });
+        } catch (Exception e) {
+            result.getError().append(String.format("执行ddl: %s, 异常:%s", config.getSql(), e.getMessage()));
+        }
         return result;
     }
 }

+ 0 - 15
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -37,21 +37,6 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         }
     }
 
-    public void executeDDL(String sql){
-        Connection connection = null;
-        try {
-            connection = getConnection();
-            new DatabaseTemplate((SimpleConnection) connection).execute(sql);
-        } catch (EmptyResultDataAccessException e) {
-            throw e;
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-            throw new ConnectorException(e.getMessage(), e.getCause());
-        } finally {
-            DatabaseUtil.close(connection);
-        }
-    }
-
     @Override
     public DatabaseConfig getConfig() {
         return config;

+ 0 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -9,7 +9,6 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.config.ESConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.WriterBatchConfig;
@@ -259,11 +258,6 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         return Collections.EMPTY_MAP;
     }
 
-    @Override
-    public Result writerDDL(ESConnectorMapper connectorMapper, DDLConfig ddlConfig) {
-        return null;
-    }
-
     private void parseProperties(List<Field> fields, Map<String, Object> sourceMap) {
         Map<String, Object> properties = (Map<String, Object>) sourceMap.get(ESUtil.PROPERTIES);
         if (CollectionUtils.isEmpty(properties)) {

+ 5 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -10,7 +10,6 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.config.FileConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.WriterBatchConfig;
@@ -22,7 +21,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -207,9 +210,4 @@ public final class FileConnector extends AbstractConnector implements Connector<
         return command;
     }
 
-    @Override
-    public Result writerDDL(FileConnectorMapper connectorMapper, DDLConfig ddlConfig) {
-        return null;
-    }
-
 }

+ 0 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -8,7 +8,6 @@ import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.config.KafkaConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.WriterBatchConfig;
@@ -112,8 +111,4 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
         return Collections.EMPTY_MAP;
     }
 
-    @Override
-    public Result writerDDL(KafkaConnectorMapper connectorMapper, DDLConfig ddlConfig) {
-        return null;
-    }
 }

+ 0 - 17
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -3,7 +3,6 @@ package org.dbsyncer.parser;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
-import org.dbsyncer.common.model.DDLConvertContext;
 import org.dbsyncer.common.model.FullConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
@@ -13,7 +12,6 @@ import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
@@ -405,19 +403,4 @@ public class ParserFactory implements Parser {
         return getConnector(connectorId).getConfig();
     }
 
-    public Result writeSql(DDLConvertContext context, Executor generalExecutor) {
-        Result result = new Result();
-        // 终止同步数据到目标源库
-        if (context.isTerminated()) {
-            result.getSuccessData();
-            return result;
-        }
-        String tableName = context.getSourceTableName();
-        String event = context.getEvent();
-        ConnectorMapper connectorMapper = context.getTargetConnectorMapper();
-        result =connectorFactory.executeSql(connectorMapper,new DDLConfig(tableName,event,
-                context.getTargetSql()));
-        return result;
-
-    }
 }

+ 15 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/DDLParser.java

@@ -0,0 +1,15 @@
+package org.dbsyncer.parser.ddl;
+
+import org.dbsyncer.connector.config.DDLConfig;
+
+public interface DDLParser {
+
+    /**
+     * 解析DDL配置
+     *
+     * @param sql
+     * @param targetTableName
+     * @return
+     */
+    DDLConfig parseDDlConfig(String sql, String targetTableName);
+}

+ 22 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/impl/DDLParserImpl.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.parser.ddl.impl;
+
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.parser.ddl.DDLParser;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DDLParserImpl implements DDLParser {
+
+    @Override
+    public DDLConfig parseDDlConfig(String sql, String targetTableName) {
+        DDLConfig ddlConfig = new DDLConfig();
+        // TODO life 替换为目标库执行SQL
+//        try {
+//            Alter alter = (Alter) CCJSqlParserUtil.parse(sql);
+//        } catch (JSQLParserException e) {
+//            logger.error(e.getMessage(), e);
+//        }
+        ddlConfig.setSql(sql);
+        return ddlConfig;
+    }
+}

+ 38 - 44
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -1,27 +1,23 @@
 package org.dbsyncer.parser.flush.impl;
 
-import net.sf.jsqlparser.JSQLParserException;
-import net.sf.jsqlparser.parser.CCJSqlParserUtil;
-import net.sf.jsqlparser.statement.alter.Alter;
-import net.sf.jsqlparser.statement.alter.AlterExpression;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.config.GeneralBufferConfig;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
-import org.dbsyncer.common.model.DDLConvertContext;
 import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
-import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.parser.ParserFactory;
+import org.dbsyncer.parser.ddl.DDLParser;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.BatchWriter;
 import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
@@ -75,6 +71,9 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Resource
     private ApplicationContext applicationContext;
 
+    @Resource
+    private DDLParser ddlParser;
+
     @PostConstruct
     public void init() {
         setConfig(generalBufferConfig);
@@ -109,7 +108,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
     @Override
     protected void pull(WriterResponse response) {
-        // 1、获取配置信息
+        // 0、获取配置信息
         final TableGroup tableGroup = cacheService.get(response.getTableGroupId(), TableGroup.class);
         final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
         final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
@@ -119,43 +118,9 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final Picker picker = new Picker(group.getFieldMapping());
         final List<Map> sourceDataList = response.getDataList();
 
-        // TODO ddl解析
+        // 1、ddl解析
         if (isDDLEvent(response.getEvent())) {
-            applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
-            final ConnectorMapper sConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getSourceConnectorId()));
-            final ConnectorMapper tConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
-            //转换到目标数据源的sql语句
-            String sourceSql = response.getSql();
-            DDLConvertContext ddlConvertContext = new DDLConvertContext(sConnectorMapper, tConnectorMapper, sourceTableName, targetTableName, event,sourceSql);
-//            ddlConvertContext.convertSql();
-            //先确定转换的sql,源数据库类型的sql-》目标数据库类型的sql
-            //如果都是db,那么db只是对应的字段进行映射,意思是只替换映射字段
-            if (sConnectorMapper instanceof DatabaseConnectorMapper && tConnectorMapper instanceof  DatabaseConnectorMapper){
-                try {
-                    Alter alter = (Alter) CCJSqlParserUtil.parse(sourceSql);
-                    for (AlterExpression alterExpression:alter.getAlterExpressions()) {
-                        for (AlterExpression.ColumnDataType columnDataType:alterExpression.getColDataTypeList()) {
-                            String columName = columnDataType.getColumnName();
-                            columName = columName.replaceAll("`","");
-                            for (FieldMapping fieldMapping :group.getFieldMapping()) {
-                                if (fieldMapping.getSource().getName().equals(columName)){
-                                    columnDataType.setColumnName(fieldMapping.getTarget().getName());
-                                    break;
-                                }
-                            }
-                        }
-                        ddlConvertContext.setTargetSql(alter.toString());
-                    }
-                } catch (JSQLParserException e) {
-//                    throw new RuntimeException(e);
-                    ddlConvertContext.convertSql();
-                }
-            }else{//如果有一方不是db,解析sql,进行填充
-                ddlConvertContext.convertSql();
-            }
-            //进行sql执行
-            Result result = parserFactory.writeSql(ddlConvertContext, generalExecutor);
-            flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
+            parseDDl(response, mapping, group);
             return;
         }
 
@@ -196,6 +161,35 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return StringUtil.equals(event, ConnectorConstant.OPERTION_ALTER);
     }
 
+    /**
+     * 解析DDL
+     *
+     * @param response
+     * @param mapping
+     * @param group
+     */
+    private void parseDDl(WriterResponse response, Mapping mapping, TableGroup group) {
+        AbstractConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
+        AbstractConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
+        // 0.生成目标表执行SQL(暂支持MySQL) fixme AE86 暂内测MySQL作为试运行版本
+        if (StringUtil.equals(sConnConfig.getConnectorType(), tConnConfig.getConnectorType()) && StringUtil.equals(ConnectorEnum.MYSQL.getType(), tConnConfig.getConnectorType())) {
+            final String targetTableName = group.getTargetTable().getName();
+            DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), targetTableName);
+            final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConnConfig);
+            Result result = connectorFactory.writerDDL(tConnectorMapper, targetDDLConfig);
+            result.setTableGroupId(group.getId());
+            result.setTargetTableGroupName(targetTableName);
+            applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
+            flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
+        }
+        // TODO life
+        // 1.获取目标表最新的属性字段
+        // 2.更新TableGroup.targetTable
+        // 3.更新表字段映射(添加相似字段)
+        // 4.更新TableGroup.command
+        // 5.合并驱动配置
+    }
+
     /**
      * 获取连接器配置
      *

+ 2 - 2
dbsyncer-web/src/main/resources/application.properties

@@ -46,10 +46,10 @@ dbsyncer.parser.table.group.buffer-period-millisecond=300
 #storage
 # 是否使用MySQL存储配置(false-关闭; true-开启)
 # false: 保存磁盘/data/config(驱动配置)|data(按驱动分别存储增量数据)|log(系统日志)}
-dbsyncer.storage.support.mysql.enabled=true
+dbsyncer.storage.support.mysql.enabled=false
 dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true
 dbsyncer.storage.support.mysql.config.username=root
-dbsyncer.storage.support.mysql.config.password=131496
+dbsyncer.storage.support.mysql.config.password=123
 # [StorageBufferActuator]线程数
 dbsyncer.storage.thread-core-size=4
 # [StorageBufferActuator]线程池队列