Sfoglia il codice sorgente

拆分工厂类,策略,新增了modify语句,add语句,drop语句,change语句支持

life 1 anno fa
parent
commit
4639d142a3

+ 9 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DDLConfig.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.connector.config;
 
+import java.util.LinkedList;
 import java.util.List;
 import net.sf.jsqlparser.statement.alter.AlterOperation;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
 import org.dbsyncer.connector.model.Field;
 
 public class DDLConfig {
@@ -10,11 +12,11 @@ public class DDLConfig {
      */
     private String sql;
 
-    private AlterOperation operation;
+    private DDLOperationEnum ddlOperationEnum;
 
-    private List<Field> addFields;
+    private List<Field> addFields = new LinkedList<>();
 
-    private List<Field> removeFields;
+    private List<Field> removeFields = new LinkedList<>();
 
     //记录源表的源字段名称
     private String sourceColumnName;
@@ -62,11 +64,11 @@ public class DDLConfig {
         this.changedColumnName = changedColumnName;
     }
 
-    public AlterOperation getOperation() {
-        return operation;
+    public DDLOperationEnum getDdlOperationEnum() {
+        return ddlOperationEnum;
     }
 
-    public void setOperation(AlterOperation operation) {
-        this.operation = operation;
+    public void setDdlOperationEnum(DDLOperationEnum ddlOperationEnum) {
+        this.ddlOperationEnum = ddlOperationEnum;
     }
 }

+ 13 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/DDLOperationEnum.java

@@ -0,0 +1,13 @@
+package org.dbsyncer.connector.enums;
+
+
+/**
+ * @author life
+ */
+
+public enum DDLOperationEnum {
+    ALTER_MODIFY,
+    ALTER_ADD,
+    ALTER_DROP,
+    ALTER_CHANGE;
+}

+ 15 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MySQLExtractor.java

@@ -11,6 +11,9 @@ import com.github.shyiko.mysql.binlog.event.TableMapEventData;
 import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
 import com.github.shyiko.mysql.binlog.network.ServerException;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.alter.Alter;
 import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.DDLChangedEvent;
@@ -308,15 +311,19 @@ public class MySQLExtractor extends AbstractDatabaseExtractor {
 
         private void parseDDL(QueryEventData data) {
             if (StringUtil.startsWith(data.getSql(), ConnectorConstant.OPERTION_ALTER)) {
-                // ALTER TABLE `test`.`my_user` MODIFY COLUMN `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL AFTER `id`
-                Lexer lexer = new Lexer(data.getSql());
-                lexer.nextToken('.');
-                lexer.nextToken('`');
-                String tableName = lexer.nextToken('`');
-                if (isFilterTable(data.getDatabase(), tableName)) {
-                    logger.info("sql:{}", data.getSql());
-                    changeEvent(new DDLChangedEvent(data.getDatabase(), tableName, ConnectorConstant.OPERTION_ALTER, data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
+                try {
+                    Alter alter = (Alter) CCJSqlParserUtil.parse(data.getSql());
+                    String tableName =alter.getTable().getName();
+                    tableName = StringUtil.replace(tableName,"`","");
+                    // ALTER TABLE `test`.`my_user` MODIFY COLUMN `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL AFTER `id`
+                    if (isFilterTable(data.getDatabase(), tableName)) {
+                        logger.info("sql:{}", data.getSql());
+                        changeEvent(new DDLChangedEvent(data.getDatabase(), tableName, ConnectorConstant.OPERTION_ALTER, data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
+                    }
+                } catch (JSQLParserException e) {
+                    throw new RuntimeException(e);
                 }
+
             }
         }
 

+ 149 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/alter/AlterStrategy.java

@@ -0,0 +1,149 @@
+package org.dbsyncer.parser.ddl.alter;
+
+import java.util.LinkedList;
+import java.util.List;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import net.sf.jsqlparser.statement.alter.AlterOperation;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.parser.ddl.strategy.JsqlParserStrategy;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author life
+ */
+public class AlterStrategy implements JsqlParserStrategy {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    Alter alter;
+
+    DDLConfig ddlConfig;
+
+    List<FieldMapping> fieldMappingList;
+
+    String targetName;
+
+    public AlterStrategy(Alter alter, DDLConfig ddlConfig, List<FieldMapping> fieldMappingList,
+            String targetName) {
+        this.alter = alter;
+        this.ddlConfig = ddlConfig;
+        this.fieldMappingList = fieldMappingList;
+        this.targetName = targetName;
+    }
+
+    @Override
+    public void parser() {
+        // 替换成目标表名
+        alter.getTable().setName(targetName);
+        for (AlterExpression expression : alter.getAlterExpressions()) {
+            AlterOperation alterOperation = expression.getOperation();
+            if (alterOperation == AlterOperation.MODIFY) {//修改属性
+                ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_MODIFY);
+                parseModify(expression);
+            } else if (AlterOperation.ADD == alterOperation) {//新增字段,只需要替换表名不需要替换sql
+                ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_ADD);
+                parseAdd(expression);
+            }else if (AlterOperation.CHANGE == alterOperation){
+                ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_CHANGE);
+                parseChange(expression);
+            }else if (AlterOperation.DROP == alterOperation){
+                ddlConfig.setDdlOperationEnum(DDLOperationEnum.ALTER_DROP);
+                parseDrop(expression);
+            }
+            ddlConfig.setSql(alter.toString());
+        }
+
+    }
+
+    //解析drop
+    //example: ALTER TABLE test_table DROP dis;
+    private void parseDrop(AlterExpression expression) {
+        String columName = expression.getColumnName();
+        columName = StringUtil.replace(columName, "`", "");
+        Field field = new Field(columName,null,0);
+        //需要把列替换成目标的列名
+        String finalColumName = columName;
+        FieldMapping fieldMapping =  fieldMappingList.stream().filter(x->x.getSource().getName().equals(
+                finalColumName)).findFirst().orElse(null);
+        if (fieldMapping !=null){
+            expression.setColumnName(fieldMapping.getTarget().getName());
+        }
+        //加入还是原名
+        ddlConfig.getRemoveFields().add(field);
+    }
+
+    //解析change属性
+    //exampleSql: ALTER TABLE test_table CHANGE duan1  duan2 INT(10)
+    private void parseChange(AlterExpression expression) {
+        String oldColumnName = expression.getColumnOldName();
+        oldColumnName =StringUtil.replace(oldColumnName,"`","");
+        ddlConfig.setSourceColumnName(oldColumnName);
+        String finalOldColumnName = oldColumnName;
+        FieldMapping fieldMapping = fieldMappingList.stream().filter(x->x.getSource().getName().equals(
+                finalOldColumnName)).findFirst().orElse(null);
+        if (fieldMapping != null) {
+            expression.setColumnOldName(fieldMapping.getTarget().getName());
+            for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+                ddlConfig.setChangedColumnName(columnDataType.getColumnName());
+            }
+        }
+
+    }
+
+    //解析add的属性
+    //exampleSql: ALTER TABLE cost ADD duan INT after(before) `tmp`;
+    private void parseAdd(AlterExpression expression) {
+        for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+            boolean findColumn = false;
+            List<String> columnSpecs = new LinkedList<>();
+            for (String spe:columnDataType.getColumnSpecs()) {//对一before,after进行处理
+                spe = StringUtil.replace(spe,"`","");
+                if (findColumn){
+                    //对before(after)字段进行映射
+                    String finalSpe = spe;
+                    FieldMapping fieldMapping = fieldMappingList.stream().filter(x->x.getSource().getName().equals(
+                            finalSpe)).findFirst().get();
+                    columnSpecs.add(fieldMapping.getTarget().getName());
+                    findColumn = false;
+                    continue;
+                }
+
+                if (StringUtil.equalsIgnoreCase(spe,"before") || StringUtil.equalsIgnoreCase(spe,"after")){
+                   findColumn =true;
+                }
+                columnSpecs.add(spe);
+            }
+            columnDataType.setColumnSpecs(columnSpecs);
+            String columName = columnDataType.getColumnName();
+            columName = StringUtil.replace(columName, "`", "");
+            Field field = new Field(columName,columnDataType.getColDataType().getDataType(),0);//感觉不需要都行,只需要名称,后续可以自己刷新
+            ddlConfig.getAddFields().add(field);
+        }
+    }
+
+    //解析modify的属性
+    //exampleSql: ALTER TABLE `test`.`test_table` MODIFY COLUMN `test` varchar(251) NULL DEFAULT NULL
+    private void parseModify(AlterExpression expression){
+        //先查找到当前的表和目标的表对应的字段
+        for (AlterExpression.ColumnDataType columnDataType : expression.getColDataTypeList()) {
+            String columName = columnDataType.getColumnName();
+            columName = StringUtil.replace(columName, "`", "");
+            for (FieldMapping fieldMapping : fieldMappingList) {
+                if (StringUtil.equals(fieldMapping.getSource().getName(), columName)) {
+                    //TODO life 找到目标的表名,先是alter进行属性替换,然后config记录新的
+                    columnDataType.setColumnName(
+                            fieldMapping.getTarget().getName());//alter语法树进行替换
+                    //因为只是修改属性,所以表名称没有变化
+                    ddlConfig.setSourceColumnName(fieldMapping.getSource().getName());
+                    ddlConfig.setChangedColumnName(fieldMapping.getSource().getName());
+                }
+            }
+        }
+    }
+}

+ 50 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/factory/ParserFactory.java

@@ -0,0 +1,50 @@
+package org.dbsyncer.parser.ddl.factory;
+
+import java.util.List;
+import org.dbsyncer.connector.Connector;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.parser.ParserException;
+import org.dbsyncer.parser.model.FieldMapping;
+
+/**
+ * @author life
+ */
+public abstract class ParserFactory {
+
+    public DDLConfig ddlConfig;
+
+    public List<FieldMapping> fieldMappingList;
+
+    ConnectorFactory connectorFactory;
+
+    String targetConnectorType;
+
+    String targetTableName;
+
+    public ParserFactory(DDLConfig ddlConfig, List<FieldMapping> fieldMappingList,
+            ConnectorFactory connectorFactory, String targetConnectorType, String targetTableName) {
+        this.ddlConfig = ddlConfig;
+        this.fieldMappingList = fieldMappingList;
+        this.connectorFactory = connectorFactory;
+        this.targetConnectorType = targetConnectorType;
+        this.targetTableName = targetTableName;
+    }
+
+    public void parser(String sql){
+        Connector connector = connectorFactory.getConnector(targetConnectorType);
+        // 暂支持关系型数据库解析
+        if (!(connector instanceof Database)) {
+            throw new ParserException("暂支持关系型数据库解析");
+        }
+        Database database = (Database) connector;
+        String quotation = database.buildSqlWithQuotation();
+        String tableName = new StringBuilder(quotation).append(targetTableName).append(quotation).toString();
+        parser(sql,tableName);
+    }
+
+
+    public abstract void parser(String sql,String targetName);
+
+}

+ 5 - 43
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/impl/DDLParserImpl.java

@@ -7,12 +7,15 @@ import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.alter.Alter;
 import net.sf.jsqlparser.statement.alter.AlterExpression;
 import net.sf.jsqlparser.statement.alter.AlterOperation;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.parser.ParserException;
 import org.dbsyncer.parser.ddl.DDLParser;
+import org.dbsyncer.parser.ddl.factory.ParserFactory;
+import org.dbsyncer.parser.ddl.jsql.JsqParserFactory;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,51 +45,10 @@ public class DDLParserImpl implements DDLParser {
     @Override
     public DDLConfig parseDDlConfig(String sql, String targetConnectorType, String targetTableName,
             List<FieldMapping> fieldMappingList) {
-        Connector connector = connectorFactory.getConnector(targetConnectorType);
-        // 暂支持关系型数据库解析
-        if (!(connector instanceof Database)) {
-            throw new ParserException("暂支持关系型数据库解析");
-        }
-
         // 替换为目标库执行SQL
         DDLConfig ddlConfig = new DDLConfig();
-        try {
-            Statement statement = CCJSqlParserUtil.parse(sql);
-            if (statement instanceof Alter) {
-                Alter alter = (Alter) statement;
-                Database database = (Database) connector;
-                String quotation = database.buildSqlWithQuotation();
-                String tableName = new StringBuilder(quotation).append(targetTableName).append(quotation).toString();
-                // 替换成目标表名
-                alter.getTable().setName(tableName);
-                logger.info("转换目标源sql:{}", alter);
-                for (AlterExpression expression: alter.getAlterExpressions()) {
-                    AlterOperation alterOperation = expression.getOperation();
-                    if (alterOperation == AlterOperation.MODIFY){//修改属性
-                        //先查找到当前的表和目标的表对应的字段
-                        for (AlterExpression.ColumnDataType columnDataType:expression.getColDataTypeList()) {
-                            String columName = columnDataType.getColumnName();
-                            columName = columName.replaceAll("`","");
-                            for (FieldMapping fieldMapping :fieldMappingList) {
-                                if (fieldMapping.getSource().getName().equals(columName)){
-                                    //TODO life 找到目标的表名,先是alter进行属性替换,然后config记录新的
-                                    columnDataType.setColumnName(fieldMapping.getTarget().getName());//alter语法树进行替换
-                                    //因为只是修改属性,所以表名称没有变化
-                                    ddlConfig.setSourceColumnName(fieldMapping.getSource().getName());
-                                    ddlConfig.setChangedColumnName(fieldMapping.getSource().getName());
-                                    break;
-                                }
-                            }
-                        }
-                    }else if (AlterOperation.ADD == alterOperation){//新增字段
-
-                    }
-                }
-                ddlConfig.setSql(alter.toString());
-            }
-        } catch (JSQLParserException e) {
-            logger.error(e.getMessage(), e);
-        }
+        ParserFactory parserFactory = new JsqParserFactory(ddlConfig,fieldMappingList,connectorFactory,targetConnectorType,targetTableName);
+        parserFactory.parser(sql);
         return ddlConfig;
     }
 

+ 40 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/jsql/JsqParserFactory.java

@@ -0,0 +1,40 @@
+package org.dbsyncer.parser.ddl.jsql;
+
+import java.util.List;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.parser.ddl.alter.AlterStrategy;
+import org.dbsyncer.parser.ddl.factory.ParserFactory;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author life
+ */
+public class JsqParserFactory extends ParserFactory {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    public JsqParserFactory(DDLConfig ddlConfig, List<FieldMapping> fieldMappingList,
+            ConnectorFactory connectorFactory, String targetConnectorType, String targetTableName) {
+        super(ddlConfig, fieldMappingList, connectorFactory, targetConnectorType, targetTableName);
+    }
+
+    @Override
+    public void parser(String sql, String targetName) {
+        try {
+            Statement statement = CCJSqlParserUtil.parse(sql);
+            if (statement instanceof Alter) {
+                AlterStrategy alterStrategy = new AlterStrategy((Alter) statement,ddlConfig,fieldMappingList,targetName);
+                alterStrategy.parser();
+            }
+        } catch (JSQLParserException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+}

+ 13 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/strategy/JsqlParserStrategy.java

@@ -0,0 +1,13 @@
+package org.dbsyncer.parser.ddl.strategy;
+
+import java.util.List;
+import org.dbsyncer.connector.config.DDLConfig;
+import org.dbsyncer.parser.model.FieldMapping;
+
+/**
+ * @author life
+ */
+public interface JsqlParserStrategy extends Strategy{
+
+    public void parser();
+}

+ 5 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ddl/strategy/Strategy.java

@@ -0,0 +1,5 @@
+package org.dbsyncer.parser.ddl.strategy;
+
+public interface Strategy {
+
+}

+ 60 - 20
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -15,6 +15,7 @@ import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.DDLConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.connector.enums.DDLOperationEnum;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.parser.Parser;
@@ -203,7 +204,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
                 // TODO life
                 // 1.获取目标表最新的属性字段
-                MetaInfo tagertMetaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
+                MetaInfo targetMetaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
                 MetaInfo originMetaInfo = parser.getMetaInfo(mapping.getSourceConnectorId(),tableGroup.getSourceTable().getName());
                 // 1.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.refreshTableFields
                 //上面已经是刷新了
@@ -213,29 +214,13 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
                 // 2.更新TableGroup.targetTable
                 tableGroup.getSourceTable().setColumn(originMetaInfo.getColumn());
-                tableGroup.getTargetTable().setColumn(tagertMetaInfo.getColumn());
+                tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
 
                 // 3.更新表字段映射(根据保留的更改的属性,进行更改)
                 List<FieldMapping> fieldMappingList = tableGroup.getFieldMapping(); // 获得刚开始filedMapping
                 List<FieldMapping> targetMappingList = new LinkedList<>();//替换完成后的filedMapping
-                for (FieldMapping fieldMapping:fieldMappingList) {
-                    String fieldSourceName = fieldMapping.getSource().getName();
-                    String filedTargetName = fieldMapping.getTarget().getName();
-                    //找到更改的源表的名称,也就是找到了对应的映射关系,这样就可以从源表找到更改后的名称进行对应,
-                    if (fieldSourceName.equals(targetDDLConfig.getSourceColumnName())){
-                        //找到源表的字段
-                        if (targetDDLConfig.getSourceColumnName().equals(targetDDLConfig.getChangedColumnName())){//说明字段名没有改变,只是改变了属性
-                            Field source = originMetaInfo.getColumn().stream()
-                                    .filter(x->x.getName().equals(fieldSourceName)).findFirst().get();
-                            Field target = tagertMetaInfo.getColumn().stream()
-                                    .filter(x->x.getName().equals(filedTargetName)).findFirst().get();
-                            //替换
-                            targetMappingList.add(new FieldMapping(source,target)) ;
-                            continue;
-                        }
-                    }
-                    targetMappingList.add(fieldMapping);
-                }
+                refreshFiledMapping(targetMappingList,fieldMappingList,originMetaInfo,targetMetaInfo,targetDDLConfig);
+
                 tableGroup.setFieldMapping(targetMappingList);
                 // 4.合并驱动配置 & 更新TableGroup.command 合并驱动应该不需要了,我只是把该替换的地方替换掉了,原来的还是保持一致,应该需要更新TableGroup.command
                 // 4.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.mergeConfig
@@ -253,6 +238,61 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         }
     }
 
+
+    //根据原来的映射关系和更改的字段进行新关系的映射组合
+    private void refreshFiledMapping(List<FieldMapping> targetMappingList, List<FieldMapping> fieldMappingList,MetaInfo originMetaInfo,
+            MetaInfo targetMetaInfo,DDLConfig targetDDLConfig) {
+        //处理映射关系
+        for (FieldMapping fieldMapping:fieldMappingList) {
+            String fieldSourceName = fieldMapping.getSource().getName();
+            String filedTargetName = fieldMapping.getTarget().getName();
+            //找到更改的源表的名称,也就是找到了对应的映射关系,这样就可以从源表找到更改后的名称进行对应,
+            if (fieldSourceName.equals(targetDDLConfig.getSourceColumnName())){
+                //找到源表的字段
+                if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_MODIFY){//说明字段名没有改变,只是改变了属性
+                    Field source = originMetaInfo.getColumn().stream()
+                            .filter(x->x.getName().equals(fieldSourceName)).findFirst().get();
+                    Field target = targetMetaInfo.getColumn().stream()
+                            .filter(x->x.getName().equals(filedTargetName)).findFirst().get();
+                    //替换
+                    targetMappingList.add(new FieldMapping(source,target)) ;
+                    continue;
+                }else if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_CHANGE){//改变名称
+                    Field source = originMetaInfo.getColumn().stream()
+                            .filter(x->x.getName().equals(targetDDLConfig.getChangedColumnName())).findFirst().get();
+                    Field target = targetMetaInfo.getColumn().stream()
+                            .filter(x->x.getName().equals(targetDDLConfig.getChangedColumnName())).findFirst().get();
+                    //替换
+                    targetMappingList.add(new FieldMapping(source,target)) ;
+                    continue;
+                }
+            }
+            targetMappingList.add(fieldMapping);
+        }
+
+        if (DDLOperationEnum.ALTER_ADD == targetDDLConfig.getDdlOperationEnum()){
+            //处理新增的映射关系
+            List<Field> addFields = targetDDLConfig.getAddFields();
+            for (Field field:addFields) {
+                Field source = originMetaInfo.getColumn().stream()
+                        .filter(x->x.getName().equals(field.getName())).findFirst().get();
+                Field target = targetMetaInfo.getColumn().stream()
+                        .filter(x->x.getName().equals(field.getName())).findFirst().get();
+                FieldMapping fieldMapping =new FieldMapping(source,target);
+                targetMappingList.add(fieldMapping);
+            }
+        }
+
+        if (DDLOperationEnum.ALTER_DROP == targetDDLConfig.getDdlOperationEnum()){
+            //处理删除字段的映射关系
+            List<Field> removeFields = targetDDLConfig.getRemoveFields();
+            for (Field field:removeFields) {
+                targetMappingList.removeIf(x->x.getSource().getName().equals(field.getName()));
+            }
+        }
+
+    }
+
     /**
      * 获取连接器配置
      *