浏览代码

add proto

AE86 2 年之前
父节点
当前提交
a3e539dc9d

+ 27 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -7,9 +7,12 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.strategy.ParserStrategy;
+import org.dbsyncer.parser.util.ConvertUtil;
+import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.Data;
@@ -18,6 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 
@@ -28,6 +32,9 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
     @Autowired
     private BufferActuator writerBufferActuator;
 
+    @Autowired
+    private PluginFactory pluginFactory;
+
     @Autowired
     private CacheService cacheService;
 
@@ -62,6 +69,25 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
 
     @Override
     protected WriterRequest deserialize(BinlogMessage message) {
-        return null;
+        String tableGroupId = message.getTableGroupId();
+        TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
+        Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
+
+        // 1、获取映射字段
+        String event = message.getEvent().name();
+        String sourceTableName = tableGroup.getSourceTable().getName();
+        String targetTableName = tableGroup.getTargetTable().getName();
+
+        Map<String, Object> data = new HashMap<>();
+        Picker picker = new Picker(tableGroup.getFieldMapping());
+        Map target = picker.pickData(data);
+
+        // 2、参数转换
+        ConvertUtil.convert(tableGroup.getConvert(), target);
+
+        // 3、插件转换
+        pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
+
+        return new WriterRequest(tableGroupId, target, mapping.getMetaId(), mapping.getTargetConnectorId(), sourceTableName, targetTableName, event, picker.getTargetFields(), tableGroup.getCommand());
     }
 }

+ 1 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -109,8 +109,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         byte[] line;
         AtomicInteger batchCounter = new AtomicInteger();
         while (batchCounter.get() < MAX_BATCH_COUNT && null != (line = context.readLine())) {
-            deserialize(BinlogMessage.parseFrom(line));
-            // getQueue().offer(deserialize(message));
+            getQueue().offer(deserialize(BinlogMessage.parseFrom(line)));
             batchCounter.getAndAdd(1);
         }
 

+ 2 - 2
dbsyncer-storage/src/main/proto/BinlogMessageProto.proto

@@ -8,7 +8,7 @@ option optimize_for = SPEED;
 message BinlogMessage {
     string table_group_id = 1;
     EventEnum event = 2;
-    repeated Data data = 3;
+    Map data = 3;
 }
 
 enum EventEnum {
@@ -17,6 +17,6 @@ enum EventEnum {
     DELETE = 2;
 }
 
-message Data {
+message Map {
     map<string, bytes> row = 1;
 }