AE86 4 роки тому
батько
коміт
420bf62418
17 змінених файлів з 176 додано та 115 видалено
  1. 19 7
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/OracleConfigChecker.java
  2. 28 10
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/Field.java
  3. 11 28
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java
  4. 16 14
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  5. 31 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractSetter.java
  6. 12 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BlobSetter.java
  7. 10 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/ClobSetter.java
  8. 7 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/TimestampSetter.java
  9. 4 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderInsert.java
  10. 13 11
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java
  11. 4 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java
  12. 1 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java
  13. 6 11
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java
  14. 1 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  15. 2 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  16. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java
  17. 10 16
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

+ 19 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/OracleConfigChecker.java

@@ -12,6 +12,7 @@ import org.springframework.stereotype.Component;
 
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * <p>1、增量同步时,目标源必须有一个主键字段用于接收ROW_ID值。</p>
@@ -30,7 +31,7 @@ public class OracleConfigChecker extends AbstractDataBaseConfigChecker {
      * 默认ROWID列名称
      */
     private static final String ROW_ID_NAME = "ORACLE_ROW_ID";
-    private static final String ROW_ID      = "ROWIDTOCHAR(ROWID) as " + ROW_ID_NAME;
+    private static final String ROW_ID      = "ROWIDTOCHAR(ROWID)";
 
     @Autowired
     private Manager manager;
@@ -39,7 +40,7 @@ public class OracleConfigChecker extends AbstractDataBaseConfigChecker {
     public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {
         // TODO 模拟测试
         Map<String, String> params = mapping.getParams();
-        mapping.getParams().put(ROW_ID_NAME, "RID");
+        mapping.getParams().put(ROW_ID_NAME, "row_id");
 
         // 没有定义目标源ROW_ID字段
         if (CollectionUtils.isEmpty(mapping.getParams()) || !params.containsKey(ROW_ID_NAME)) {
@@ -57,13 +58,24 @@ public class OracleConfigChecker extends AbstractDataBaseConfigChecker {
         }
 
         List<Field> sourceColumn = tableGroup.getSourceTable().getColumn();
-        Field sourceField = new Field(ROW_ID, "VARCHAR2", 12, false, true);
-        sourceColumn.add(0, sourceField);
+        List<Field> sFieldList = sourceColumn.stream().filter(f -> StringUtils.endsWithIgnoreCase(f.getName(), ROW_ID)).collect(Collectors.toList());
+        Field sourceField = new Field(ROW_ID, "VARCHAR2", 12, false, ROW_ID_NAME, true);
+        if(CollectionUtils.isEmpty(sFieldList)){
+            sourceColumn.add(0, sourceField);
+        }
 
         List<Field> targetColumn = tableGroup.getTargetTable().getColumn();
-        targetColumn.parallelStream().forEach(f -> f.setPk(false));
-        Field targetField = new Field(targetRowIdName, "VARCHAR2", 12, true, false);
-        targetColumn.add(0, targetField);
+        Field targetField = null;
+        for(Field f : targetColumn){
+            f.setPk(StringUtils.endsWithIgnoreCase(f.getName(), targetRowIdName));
+            if(f.isPk()){
+                targetField = f;
+            }
+        }
+        if(null == targetField){
+            targetField = new Field(targetRowIdName, "VARCHAR2", 12, true);
+            targetColumn.add(0, targetField);
+        }
 
         tableGroup.getFieldMapping().add(0, new FieldMapping(sourceField, targetField));
     }

+ 28 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/Field.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.connector.config;
 
-import com.alibaba.fastjson.annotation.JSONField;
 import org.dbsyncer.common.util.JsonUtil;
 
 /**
@@ -33,14 +32,23 @@ public class Field {
     private boolean pk;
 
     /**
-     * 是否数据源表字段
+     * 字段别名
      */
-    @JSONField(serialize = false)
-    private boolean sourceTable;
+    private String labelName;
+
+    /**
+     * 是否系统字段
+     */
+    private boolean unmodifiabled;
 
     public Field() {
     }
 
+    public Field(String name, String labelName) {
+        this.name = name;
+        this.labelName = labelName;
+    }
+
     public Field(String name, String typeName, int type) {
         this.name = name;
         this.typeName = typeName;
@@ -54,12 +62,13 @@ public class Field {
         this.pk = pk;
     }
 
-    public Field(String name, String typeName, int type, boolean pk, boolean sourceTable) {
+    public Field(String name, String typeName, int type, boolean pk, String labelName, boolean unmodifiabled) {
         this.name = name;
         this.typeName = typeName;
         this.type = type;
         this.pk = pk;
-        this.sourceTable = sourceTable;
+        this.labelName = labelName;
+        this.unmodifiabled = unmodifiabled;
     }
 
     public String getName() {
@@ -94,12 +103,21 @@ public class Field {
         this.pk = pk;
     }
 
-    public boolean isSourceTable() {
-        return sourceTable;
+    public String getLabelName() {
+        return labelName;
+    }
+
+    public Field setLabelName(String labelName) {
+        this.labelName = labelName;
+        return this;
+    }
+
+    public boolean isUnmodifiabled() {
+        return unmodifiabled;
     }
 
-    public Field setSourceTable(boolean sourceTable) {
-        this.sourceTable = sourceTable;
+    public Field setUnmodifiabled(boolean unmodifiabled) {
+        this.unmodifiabled = unmodifiabled;
         return this;
     }
 

+ 11 - 28
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -6,37 +6,24 @@ import java.util.List;
 
 public class SqlBuilderConfig {
 
-    private Database     database;
+    private Database    database;
     // 表名
-    private String       tableName;
+    private String      tableName;
     // 主键
-    private String       pk;
-    // 字段名称
-    private List<String> filedNames;
-    // 字段别名
-    private List<String> labelNames;
+    private String      pk;
+    // 字段
+    private List<Field> fields;
     // 过滤条件
-    private String       queryFilter;
+    private String      queryFilter;
     // 引号
-    private String       quotation;
+    private String      quotation;
 
-    public SqlBuilderConfig(Database database, String tableName, String pk, List<String> filedNames, String queryFilter,
+    public SqlBuilderConfig(Database database, String tableName, String pk, List<Field> fields, String queryFilter,
                             String quotation) {
         this.database = database;
         this.tableName = tableName;
         this.pk = pk;
-        this.filedNames = filedNames;
-        this.queryFilter = queryFilter;
-        this.quotation = quotation;
-    }
-
-    public SqlBuilderConfig(Database database, String tableName, String pk, List<String> filedNames,
-                            List<String> labelNames, String queryFilter, String quotation) {
-        this.database = database;
-        this.tableName = tableName;
-        this.pk = pk;
-        this.filedNames = filedNames;
-        this.labelNames = labelNames;
+        this.fields = fields;
         this.queryFilter = queryFilter;
         this.quotation = quotation;
     }
@@ -53,12 +40,8 @@ public class SqlBuilderConfig {
         return pk;
     }
 
-    public List<String> getFiledNames() {
-        return filedNames;
-    }
-
-    public List<String> getLabelNames() {
-        return labelNames;
+    public List<Field> getFields() {
+        return fields;
     }
 
     public String getQueryFilter() {

+ 16 - 14
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -91,7 +91,7 @@ public abstract class AbstractDatabaseConnector implements Database {
         Map<String, String> map = new HashMap<>();
 
         String query = ConnectorConstant.OPERTION_QUERY;
-        map.put(query, buildSql(query, table, null, queryFilterSql));
+        map.put(query, buildSql(query, table, commandConfig.getOriginalTable(), queryFilterSql));
 
         // 获取查询总数SQL
         String quotation = buildSqlWithQuotation();
@@ -476,25 +476,27 @@ public abstract class AbstractDatabaseConnector implements Database {
         }
         List<Field> column = table.getColumn();
         if (CollectionUtils.isEmpty(column)) {
-            return null;
+            throw new ConnectorException("Column can not be null.");
         }
-        // 获取主键
         String pk = null;
-        // 去掉重复的查询字段
-        List<String> filedNames = new ArrayList<>();
+        Set<String> mark = new HashSet<>();
+        List<Field> fields = new ArrayList<>();
         for (Field c : column) {
+            String name = c.getName();
+            if(StringUtils.isBlank(name)){
+                throw new ConnectorException("The field name can not be empty.");
+            }
             if (c.isPk()) {
-                pk = c.getName();
+                pk = name;
             }
-            String name = c.getName();
-            // 如果没有重复
-            if (StringUtils.isNotBlank(name) && !filedNames.contains(name)) {
-                filedNames.add(name);
+            if (!mark.contains(name)) {
+                fields.add(c);
+                mark.add(name);
             }
         }
-        if (CollectionUtils.isEmpty(filedNames)) {
-            logger.error("The filedNames can not be empty.");
-            throw new ConnectorException("The filedNames can not be empty.");
+        if (CollectionUtils.isEmpty(fields)) {
+            logger.error("The fields can not be empty.");
+            throw new ConnectorException("The fields can not be empty.");
         }
         String tableName = table.getName();
         if (StringUtils.isBlank(tableName)) {
@@ -505,7 +507,7 @@ public abstract class AbstractDatabaseConnector implements Database {
             pk = DatabaseUtil.findTablePrimaryKey(originalTable, "");
         }
 
-        SqlBuilderConfig config = new SqlBuilderConfig(this, tableName, pk, filedNames, queryFilterSQL, buildSqlWithQuotation());
+        SqlBuilderConfig config = new SqlBuilderConfig(this, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
         return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
     }
 

+ 31 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractSetter.java

@@ -1,10 +1,21 @@
 package org.dbsyncer.connector.database;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.ParameterizedType;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
 public abstract class AbstractSetter<T> implements Setter {
 
+    private final Logger   logger = LoggerFactory.getLogger(getClass());
+    private final Class<T> parameterClazz;
+
+    public AbstractSetter() {
+        parameterClazz = (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+    }
+
     /**
      * 实现字段类型参数设置
      *
@@ -14,20 +25,37 @@ public abstract class AbstractSetter<T> implements Setter {
      */
     protected abstract void set(PreparedStatement ps, int i, T val) throws SQLException;
 
+    /**
+     * 参数类型不匹配
+     *
+     * @param val
+     * @return
+     */
+    protected void setIfValueTypeNotMatch(PreparedStatement ps, int i, int type, Object val) throws SQLException {
+        ps.setNull(i, type);
+    }
+
     @Override
     public void set(PreparedStatement ps, int i, int type, Object val) {
         try {
             if (null == val) {
                 ps.setNull(i, type);
-            } else {
-                set(ps, i, (T) val);
+                return;
             }
+
+            if (val.getClass().equals(parameterClazz)) {
+                set(ps, i, (T) (val));
+                return;
+            }
+
+            setIfValueTypeNotMatch(ps, i, type, val);
         } catch (Exception e) {
+            logger.error("Set preparedStatement error: {}", e.getMessage());
             try {
                 ps.setNull(i, type);
             } catch (SQLException e1) {
             }
         }
     }
-    
+
 }

+ 12 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BlobSetter.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database.setter;
 
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.AbstractSetter;
 
 import java.sql.Blob;
@@ -13,4 +14,15 @@ public class BlobSetter extends AbstractSetter<Blob> {
         ps.setBlob(i, val);
     }
 
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedStatement ps, int i, int type, Object val) throws SQLException {
+        // 存放jpg等文件
+        if (val instanceof Blob) {
+            Blob blob = (Blob) val;
+            ps.setBlob(i, blob);
+            return;
+        }
+        throw new ConnectorException(String.format("BlobSetter can not find type [%s], val [%s]", type, val));
+    }
+
 }

+ 10 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/ClobSetter.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database.setter;
 
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.AbstractSetter;
 
 import java.sql.Clob;
@@ -13,4 +14,13 @@ public class ClobSetter extends AbstractSetter<Clob> {
         ps.setClob(i, val);
     }
 
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedStatement ps, int i, int type, Object val) throws SQLException {
+        if(val instanceof Clob) {
+            Clob clob = (Clob) val;
+            ps.setClob(i, clob);
+            return;
+        }
+        throw new ConnectorException(String.format("ClobSetter can not find type [%s], val [%s]", type, val));
+    }
 }

+ 7 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/TimestampSetter.java

@@ -6,10 +6,15 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 
-public class TimestampSetter extends AbstractSetter {
+public class TimestampSetter extends AbstractSetter<Timestamp> {
 
     @Override
-    protected void set(PreparedStatement ps, int i, Object val) throws SQLException {
+    protected void set(PreparedStatement ps, int i, Timestamp val) throws SQLException {
+        ps.setTimestamp(i, val);
+    }
+
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedStatement ps, int i, int type, Object val) throws SQLException {
         ps.setTimestamp(i, Timestamp.valueOf(String.valueOf(val)));
     }
 

+ 4 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderInsert.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
+import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 
@@ -15,17 +16,17 @@ public class SqlBuilderInsert extends AbstractSqlBuilder {
     @Override
     public String buildSql(SqlBuilderConfig config) {
         String tableName = config.getTableName();
-        List<String> filedNames = config.getFiledNames();
+        List<Field> fields = config.getFields();
         String quotation = config.getQuotation();
 
         StringBuilder sql = new StringBuilder();
         StringBuilder fs = new StringBuilder();
         StringBuilder vs = new StringBuilder();
-        int size = filedNames.size();
+        int size = fields.size();
         int end = size - 1;
         for (int i = 0; i < size; i++) {
             // "USERNAME"
-            fs.append(quotation).append(filedNames.get(i)).append(quotation);
+            fs.append(quotation).append(fields.get(i).getName()).append(quotation);
             vs.append("?");
             //如果不是最后一个字段
             if (i < end) {

+ 13 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.dbsyncer.connector.database.Database;
@@ -25,23 +25,25 @@ public class SqlBuilderQuery extends AbstractSqlBuilder {
     @Override
     public String buildQuerySql(SqlBuilderConfig config) {
         String tableName = config.getTableName();
-        List<String> filedNames = config.getFiledNames();
-        List<String> labelNames = config.getLabelNames();
-        boolean appendLabel = !CollectionUtils.isEmpty(labelNames);
+        List<Field> fields = config.getFields();
         String quotation = config.getQuotation();
         String queryFilter = config.getQueryFilter();
 
         StringBuilder sql = new StringBuilder();
-        int size = filedNames.size();
+        int size = fields.size();
         int end = size - 1;
+        Field field = null;
         for (int i = 0; i < size; i++) {
-            // "USERNAME"
-            sql.append(quotation).append(filedNames.get(i)).append(quotation);
+            field = fields.get(i);
+            if (field.isUnmodifiabled()) {
+                sql.append(field.getName());
+            } else {
+                sql.append(quotation).append(field.getName()).append(quotation);
+            }
 
-            // label
-            if(appendLabel){
-                // name as "myName"
-                sql.append(" as \"").append(labelNames.get(i)).append("\"");
+            // "USERNAME" as "myName"
+            if (StringUtils.isNotEmpty(field.getLabelName())) {
+                sql.append(" as ").append(quotation).append(field.getLabelName()).append(quotation);
             }
 
             //如果不是最后一个字段

+ 4 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
+import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.slf4j.Logger;
@@ -19,15 +20,15 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
     @Override
     public String buildSql(SqlBuilderConfig config) {
         String tableName = config.getTableName();
-        List<String> filedNames = config.getFiledNames();
+        List<Field> fields = config.getFields();
         String quotation = config.getQuotation();
         StringBuilder sql = new StringBuilder();
-        int size = filedNames.size();
+        int size = fields.size();
         int end = size - 1;
         sql.append("UPDATE ").append(quotation).append(tableName).append(quotation).append(" SET ");
         for (int i = 0; i < size; i++) {
             // "USERNAME"=?
-            sql.append(quotation).append(filedNames.get(i)).append(quotation).append("=?");
+            sql.append(quotation).append(fields.get(i).getName()).append(quotation).append("=?");
             //如果不是最后一个字段
             if (i < end) {
                 sql.append(",");

+ 1 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -234,6 +234,7 @@ public class DBChangeNotification {
                 if(rs.next()){
                     final int size = rs.getMetaData().getColumnCount();
                     do{
+                        data.add(rowId);
                         for (int i = 1; i <= size; i++) {
                             data.add(rs.getObject(i));
                         }

+ 6 - 11
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -151,9 +151,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         if (ListenerTypeEnum.isTiming(listenerType)) {
             QuartzExtractor extractor = listener.getExtractor(listenerType, QuartzExtractor.class);
             List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
-            PrimaryKeyMappingStrategy strategy = PrimaryKeyMappingEnum.DEFAULT.getPrimaryKeyMappingStrategy();
 
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list, strategy));
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
             extractor.setConnectorFactory(connectorFactory);
             extractor.setScheduledTaskService(scheduledTaskService);
             extractor.setCommands(commands);
@@ -164,9 +163,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         if (ListenerTypeEnum.isLog(listenerType)) {
             final String connectorType = connectorConfig.getConnectorType();
             AbstractExtractor extractor = listener.getExtractor(connectorType, AbstractExtractor.class);
-            PrimaryKeyMappingStrategy strategy = PrimaryKeyMappingEnum.getPrimaryKeyMappingStrategy(connectorType);
 
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list, strategy, extractor));
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list, extractor));
             return extractor;
         }
         return null;
@@ -184,7 +182,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         protected Mapping                   mapping;
         protected String                    metaId;
         protected AtomicBoolean             changed = new AtomicBoolean();
-        protected PrimaryKeyMappingStrategy strategy;
 
         @Override
         public void flushEvent(Map<String, String> map) {
@@ -239,9 +236,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
         private List<FieldPicker> tablePicker;
 
-        public QuartzListener(Mapping mapping, List<TableGroup> list, PrimaryKeyMappingStrategy strategy) {
+        public QuartzListener(Mapping mapping, List<TableGroup> list) {
             this.mapping = mapping;
-            this.strategy = strategy;
             this.metaId = mapping.getMetaId();
             this.tablePicker = new LinkedList<>();
             list.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
@@ -256,7 +252,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
                     rowChangedEvent.getAfter());
 
             // 处理过程有异常向上抛
-            parser.execute(mapping, picker.getTableGroup(), rowChangedEvent, strategy);
+            parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
 
             // 标记有变更记录
             changed.compareAndSet(false, true);
@@ -290,10 +286,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         private AtomicInteger eventCounter;
         private static final int MAX_LOG_CACHE_SIZE = 128;
 
-        public LogListener(Mapping mapping, List<TableGroup> list, PrimaryKeyMappingStrategy strategy, Extractor extractor) {
+        public LogListener(Mapping mapping, List<TableGroup> list, Extractor extractor) {
             this.mapping = mapping;
             this.metaId = mapping.getMetaId();
-            this.strategy = strategy;
             this.extractor = extractor;
             this.tablePicker = new LinkedHashMap<>();
             this.eventCounter = new AtomicInteger();
@@ -321,7 +316,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
                         rowChangedEvent.setBefore(before);
                         rowChangedEvent.setAfter(after);
                         rowChangedEvent.setPk(picker.getPk());
-                        parser.execute(mapping, picker.getTableGroup(), rowChangedEvent, strategy);
+                        parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                     }
                 });
                 // 标记有变更记录

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -143,7 +143,6 @@ public interface Parser {
      * @param mapping
      * @param tableGroup
      * @param rowChangedEvent
-     * @param strategy
      */
-    void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent, PrimaryKeyMappingStrategy strategy);
+    void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent);
 }

+ 2 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -19,7 +19,6 @@ import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.*;
-import org.dbsyncer.parser.strategy.PrimaryKeyMappingStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
@@ -242,7 +241,7 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent, PrimaryKeyMappingStrategy strategy) {
+    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent) {
         logger.info("解析数据=> tableName:{}, event:{}, before:{}, after:{}, rowId:{}", rowChangedEvent.getTableName(), rowChangedEvent.getEvent(),
                 rowChangedEvent.getBefore(), rowChangedEvent.getAfter(), rowChangedEvent.getRowId());
         final String metaId = mapping.getMetaId();
@@ -257,9 +256,8 @@ public class ParserFactory implements Parser {
         Map<String, Object> data = StringUtils.equals(ConnectorConstant.OPERTION_DELETE, event) ? rowChangedEvent.getBefore() : rowChangedEvent.getAfter();
         PickerUtil.pickData(picker, data);
 
-        // 2、主键映射策略,Oracle需要替换主键为rowId
+        // 2、获取目标源字段
         Map target = picker.getTarget();
-        strategy.handle(target, rowChangedEvent);
 
         // 3、参数转换
         ConvertUtil.convert(tableGroup.getConvert(), target);

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -108,7 +108,7 @@ public abstract class PickerUtil {
             sField = sFields.get(k);
             tField = tFields.get(k);
             if (null != sField && null != tField) {
-                v = source.get(sField.getName());
+                v = source.get(sField.isUnmodifiabled() ? sField.getLabelName() : sField.getName());
                 target.put(tField.getName(), v);
             }
         }

+ 10 - 16
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -297,14 +297,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             jdbcTemplate.execute(ddl);
         }
 
-        List<String> fieldNames = new ArrayList<>();
-        List<String> labelNames = new ArrayList<>();
-        executor.getFieldPairs().forEach(p -> {
-            fieldNames.add(p.columnName);
-            labelNames.add(p.labelName);
-        });
-        final SqlBuilderConfig config = new SqlBuilderConfig(connector, table, ConfigConstant.CONFIG_MODEL_ID, fieldNames, labelNames, "",
-                "");
+        List<Field> fields = executor.getFieldPairs().stream().map(p -> new Field(p.columnName, p.labelName)).collect(Collectors.toList());
+        final SqlBuilderConfig config = new SqlBuilderConfig(connector, table, ConfigConstant.CONFIG_MODEL_ID, fields, "","");
 
         String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildQuerySql(config);
         String insert = SqlBuilderEnum.INSERT.getSqlBuilder().buildSql(config);
@@ -365,17 +359,17 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     class FieldPair {
-        String labelName;
         String columnName;
+        String labelName;
 
-        public FieldPair(String labelName) {
-            this.labelName = labelName;
-            this.columnName = labelName;
+        public FieldPair(String columnName) {
+            this.columnName = columnName;
+            this.labelName = columnName;
         }
 
-        public FieldPair(String labelName, String columnName) {
-            this.labelName = labelName;
+        public FieldPair(String columnName, String labelName) {
             this.columnName = columnName;
+            this.labelName = labelName;
         }
     }
 
@@ -389,8 +383,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_ID, new FieldPair(ConfigConstant.CONFIG_MODEL_ID));
             fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_NAME, new FieldPair(ConfigConstant.CONFIG_MODEL_NAME));
             fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_TYPE, new FieldPair(ConfigConstant.CONFIG_MODEL_TYPE));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_CREATE_TIME, new FieldPair(ConfigConstant.CONFIG_MODEL_CREATE_TIME, TABLE_CREATE_TIME));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, new FieldPair(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, TABLE_UPDATE_TIME));
+            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_CREATE_TIME, new FieldPair(TABLE_CREATE_TIME, ConfigConstant.CONFIG_MODEL_CREATE_TIME));
+            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, new FieldPair(TABLE_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME));
             fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_JSON, new FieldPair(ConfigConstant.CONFIG_MODEL_JSON));
             fieldPairMap.putIfAbsent(ConfigConstant.DATA_SUCCESS, new FieldPair(ConfigConstant.DATA_SUCCESS));
             fieldPairMap.putIfAbsent(ConfigConstant.DATA_EVENT, new FieldPair(ConfigConstant.DATA_EVENT));