Kaynağa Gözat

!85 merge
Merge pull request !85 from AE86/V_1.0.0_Beta

AE86 2 yıl önce
ebeveyn
işleme
2d609eb82c

+ 68 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java

@@ -1,5 +1,8 @@
 package org.dbsyncer.common.util;
 
+import org.dbsyncer.common.CommonException;
+import org.dbsyncer.common.column.Lexer;
+
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.text.DateFormat;
@@ -92,7 +95,32 @@ public abstract class DateFormatUtil {
     }
 
     public static Timestamp stringToTimestamp(String s) {
-        return Timestamp.valueOf(LocalDateTime.from(CHINESE_STANDARD_TIME_FORMATTER.parse(s)));
+        try {
+            // 2020-7-12 00:00:00
+            if (s.length() < 19) {
+                return Timestamp.valueOf(LocalDateTime.from(CHINESE_STANDARD_TIME_FORMATTER.parse(format(s))));
+            }
+
+            // 2020-07-12 00:00:00
+            if (s.length() == 19) {
+                return Timestamp.valueOf(LocalDateTime.from(CHINESE_STANDARD_TIME_FORMATTER.parse(s)));
+            }
+
+            // 2022-07-21T05:35:34.000+0800
+            if (s.length() == 28) {
+                return stringToTimestamp(s, GMT_FORMATTER);
+            }
+
+            // 2022-07-21T05:35:34.000+08:00
+            if (s.length() == 29) {
+                s = s.replaceAll(":[^:]*$", "00");
+                return stringToTimestamp(s, GMT_FORMATTER);
+            }
+
+            throw new CommonException(String.format("Can not parse val[%s] to Timestamp", s));
+        } catch (ParseException e) {
+            throw new CommonException(e);
+        }
     }
 
     public static Timestamp stringToTimestamp(String s, DateFormat formatter) throws ParseException {
@@ -113,4 +141,43 @@ public abstract class DateFormatUtil {
         return OffsetDateTime.from(parsedTimestamp).withOffsetSameInstant(ZoneOffset.UTC);
     }
 
+    private static String format(String s) {
+        StringBuilder buf = new StringBuilder();
+        Lexer lexer = new Lexer(s);
+        char comma = '-';
+        // 年
+        nextToken(lexer, buf, comma);
+        // 月
+        nextToken(lexer, buf, comma);
+        // 日
+        comma = ' ';
+        nextToken(lexer, buf, comma);
+        // 时
+        comma = ':';
+        nextToken(lexer, buf, comma);
+        // 分
+        nextToken(lexer, buf, comma);
+        // 秒
+        nextToken(lexer, buf, comma, false);
+        return buf.toString();
+    }
+
+    private static void nextToken(Lexer lexer, StringBuilder buf, char comma) {
+        nextToken(lexer, buf, comma, true);
+    }
+
+    private static void nextToken(Lexer lexer, StringBuilder buf, char comma, boolean appendComma) {
+        buf.append(fillZero(lexer.nextToken(comma)));
+        if (appendComma) {
+            buf.append(comma);
+        }
+    }
+
+    private static String fillZero(String s) {
+        if (s.length() < 2) {
+            return String.format("%02d", Integer.parseInt(s));
+        }
+        return s;
+    }
+
 }

+ 9 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/DateValueMapper.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.schema;
 
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
@@ -21,6 +22,14 @@ public class DateValueMapper extends AbstractValueMapper<Date> {
             return Date.valueOf(timestamp.toLocalDateTime().toLocalDate());
         }
 
+        if (val instanceof String) {
+            String s = (String) val;
+            Timestamp timestamp = DateFormatUtil.stringToTimestamp(s);
+            if (null != timestamp) {
+                return Date.valueOf(timestamp.toLocalDateTime().toLocalDate());
+            }
+        }
+
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 14 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/TimestampValueMapper.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.schema;
 
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
@@ -29,7 +30,19 @@ public class TimestampValueMapper extends AbstractValueMapper<Timestamp> {
 
         if (val instanceof String) {
             String s = (String) val;
-            return Timestamp.valueOf(s);
+            Timestamp timestamp = DateFormatUtil.stringToTimestamp(s);
+            if (null != timestamp) {
+                return timestamp;
+            }
+        }
+
+        if (val instanceof byte[]) {
+            byte[] bytes = (byte[]) val;
+            String s = new String(bytes);
+            Timestamp timestamp = DateFormatUtil.stringToTimestamp(s);
+            if (null != timestamp) {
+                return timestamp;
+            }
         }
 
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java

@@ -68,7 +68,7 @@ public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<Writer
         Map<String, Object> data = new HashMap<>();
         try {
             final Picker picker = new Picker(tableGroup.getFieldMapping());
-            final Map<String, Field> fieldMap = picker.getTargetFieldMap();
+            final Map<String, Field> fieldMap = picker.getSourceFieldMap();
             message.getData().getRowMap().forEach((k, v) -> {
                 if (fieldMap.containsKey(k)) {
                     data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));

+ 1 - 13
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -295,19 +295,7 @@ public class ParserFactory implements Parser {
     @Override
     public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
         logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getDataMap());
-
-        // 1、获取映射字段
-        final Picker picker = new Picker(tableGroup.getFieldMapping());
-        final Map target = picker.pickData(event.getDataMap());
-
-        // 2、参数转换
-        ConvertUtil.convert(tableGroup.getConvert(), target);
-
-        // 3、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), event.getEvent(), event.getDataMap(), target);
-
-        // 4、处理数据
-        parserStrategy.execute(tableGroup.getId(), event.getEvent(), target);
+        parserStrategy.execute(tableGroup.getId(), event.getEvent(), event.getDataMap());
     }
 
     /**

+ 2 - 57
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/StringToTimestampHandler.java

@@ -1,11 +1,8 @@
 package org.dbsyncer.parser.convert.handler;
 
-import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.parser.convert.AbstractHandler;
 
-import java.text.ParseException;
-
 /**
  * 字符串转Timestamp
  *
@@ -16,64 +13,12 @@ import java.text.ParseException;
 public class StringToTimestampHandler extends AbstractHandler {
 
     @Override
-    public Object convert(String args, Object value) throws ParseException {
+    public Object convert(String args, Object value) {
         if (value instanceof String) {
             String s = (String) value;
-            // 2020-7-12 00:00:00
-            if (s.length() < 19) {
-                return DateFormatUtil.stringToTimestamp(format(s));
-            }
-
-            // 2022-07-21T05:35:34.000+0800
-            if (s.length() == 28) {
-                return DateFormatUtil.stringToTimestamp(s, DateFormatUtil.GMT_FORMATTER);
-            }
-
-            // 2022-07-21T05:35:34.000+08:00
-            if (s.length() == 29) {
-                s = s.replaceAll(":[^:]*$", "00");
-                return DateFormatUtil.stringToTimestamp(s, DateFormatUtil.GMT_FORMATTER);
-            }
+            return DateFormatUtil.stringToTimestamp(s);
         }
         return value;
     }
 
-    private String format(String s) {
-        StringBuilder buf = new StringBuilder();
-        Lexer lexer = new Lexer(s);
-        char comma = '-';
-        // 年
-        nextToken(lexer, buf, comma);
-        // 月
-        nextToken(lexer, buf, comma);
-        // 日
-        comma = ' ';
-        nextToken(lexer, buf, comma);
-        // 时
-        comma = ':';
-        nextToken(lexer, buf, comma);
-        // 分
-        nextToken(lexer, buf, comma);
-        // 秒
-        nextToken(lexer, buf, comma, false);
-        return buf.toString();
-    }
-
-    private void nextToken(Lexer lexer, StringBuilder buf, char comma) {
-        nextToken(lexer, buf, comma, true);
-    }
-
-    private void nextToken(Lexer lexer, StringBuilder buf, char comma, boolean appendComma) {
-        buf.append(fillZero(lexer.nextToken(comma)));
-        if (appendComma) {
-            buf.append(comma);
-        }
-    }
-
-    private String fillZero(String s) {
-        if (s.length() < 2) {
-            return String.format("%02d", Integer.parseInt(s));
-        }
-        return s;
-    }
 }

+ 25 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -7,16 +7,20 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
+import org.dbsyncer.parser.util.ConvertUtil;
+import org.dbsyncer.plugin.PluginFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -31,6 +35,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Autowired
     private ParserFactory parserFactory;
 
+    @Autowired
+    private PluginFactory pluginFactory;
+
     @Autowired
     private FlushStrategy flushStrategy;
 
@@ -68,17 +75,28 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         final TableGroup tableGroup = cacheService.get(response.getTableGroupId(), TableGroup.class);
         final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
         final String targetTableName = tableGroup.getTargetTable().getName();
+        final String event = response.getEvent();
+        final List<Map> sourceDataList = response.getDataList();
+
+        // 2、映射字段
         final Picker picker = new Picker(tableGroup.getFieldMapping());
+        List<Map> targetDataList = picker.pickData(sourceDataList);
+
+        // 3、参数转换
+        ConvertUtil.convert(tableGroup.getConvert(), targetDataList);
+
+        // 4、插件转换
+        pluginFactory.convert(tableGroup.getPlugin(), event, sourceDataList, targetDataList);
 
-        // 2、批量执行同步
+        // 5、批量执行同步
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
-        Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, response.getEvent(),
-                picker.getTargetFields(), response.getDataList(), bufferActuatorConfig.getWriterBatchCount()));
+        BatchWriter batchWriter = new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount());
+        Result result = parserFactory.writeBatch(batchWriter);
 
-        // 3、持久化同步结果
-        flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
+        // 6、持久化同步结果
+        flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
 
-        // 4、消息处理完成
+        // 7、完成处理
         parserStrategy.complete(response.getMessageIds());
     }
 

+ 6 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -42,14 +42,6 @@ public class Picker {
         return targetMapList;
     }
 
-    public Map pickData(Map<String, Object> data) {
-        Map targetMap = new HashMap<>();
-        if (!CollectionUtils.isEmpty(data)) {
-            exchange(sourceFields.size(), sourceFields, targetFields, data, targetMap);
-        }
-        return targetMap;
-    }
-
     private void exchange(int sFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source,
                           Map<String, Object> target) {
         Field sField = null;
@@ -77,11 +69,15 @@ public class Picker {
         return primaryKey;
     }
 
+    public List<Field> getSourceFields() {
+        return sourceFields.stream().filter(f -> null != f).collect(Collectors.toList());
+    }
+
     public List<Field> getTargetFields() {
         return targetFields.stream().filter(f -> null != f).collect(Collectors.toList());
     }
 
-    public Map<String, Field> getTargetFieldMap() {
-        return getTargetFields().stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+    public Map<String, Field> getSourceFieldMap() {
+        return getSourceFields().stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
 }

+ 10 - 4
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -96,15 +96,21 @@ public class PluginFactory {
         return Collections.unmodifiableList(plugins);
     }
 
-    public void convert(Plugin plugin, List<Map> source, List<Map> target) {
+    public void convert(Plugin plugin, List<Map> sourceList, List<Map> targetList) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, source, target));
+            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, sourceList, targetList));
         }
     }
 
-    public void convert(Plugin plugin, String event, Map<String, Object> source, Map<String, Object> target) {
+    public void convert(Plugin plugin, String event, List<Map> sourceList, List<Map> targetList) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            service.get(plugin.getClassName()).convert(new IncrementConvertContext(applicationContextProxy, event, source, target));
+            ConvertService convertService = service.get(plugin.getClassName());
+            int size = sourceList.size();
+            if(size == targetList.size()){
+                for (int i = 0; i < size; i++) {
+                    convertService.convert(new IncrementConvertContext(applicationContextProxy, event, sourceList.get(i), targetList.get(i)));
+                }
+            }
         }
     }