1
0
AE86 2 жил өмнө
parent
commit
ce7760de48

+ 14 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.CommandConfig;
@@ -300,7 +301,19 @@ public class ParserFactory implements Parser {
     public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
         logger.debug("Table[{}] {}, before:{}, after:{}", event.getSourceTableName(), event.getEvent(), event.getBefore(), event.getAfter());
 
-        parserStrategy.execute(mapping, tableGroup, event);
+        // 1、获取映射字段
+        final Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, event.getEvent()) ? event.getBefore() : event.getAfter();
+        final Picker picker = new Picker(tableGroup.getFieldMapping());
+        final Map target = picker.pickData(data);
+
+        // 2、参数转换
+        ConvertUtil.convert(tableGroup.getConvert(), target);
+
+        // 3、插件转换
+        pluginFactory.convert(tableGroup.getPlugin(), event.getEvent(), data, target);
+
+        // 4、处理数据
+        parserStrategy.execute(tableGroup.getId(), event.getEvent(), target);
     }
 
     /**

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -38,7 +38,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private static final double BUFFER_THRESHOLD = 0.8;
 
-    private static final int MAX_BATCH_COUNT = 1000;
+    private static final int MAX_BATCH_COUNT = 3000;
 
     private static final int PERIOD = 300;
 

+ 15 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -7,17 +7,12 @@ import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
+import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.strategy.FlushStrategy;
-import org.dbsyncer.parser.model.WriterRequest;
-import org.dbsyncer.parser.model.WriterResponse;
-import org.dbsyncer.parser.model.BatchWriter;
-import org.dbsyncer.parser.model.Connector;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.util.Collections;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -51,22 +46,26 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         if (response.isMerged()) {
             return;
         }
-        response.setMetaId(request.getMetaId());
-        response.setTargetConnectorId(request.getTargetConnectorId());
-        response.setSourceTableName(request.getSourceTableName());
-        response.setTargetTableName(request.getTargetTableName());
+        response.setTableGroupId(request.getTableGroupId());
         response.setEvent(request.getEvent());
-        response.setFields(Collections.unmodifiableList(request.getFields()));
-        response.setCommand(request.getCommand());
         response.setMerged(true);
     }
 
     @Override
     protected void pull(WriterResponse response) {
-        ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(response.getTargetConnectorId()));
-        Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, response.getCommand(), response.getTargetTableName(), response.getEvent(),
-                response.getFields(), response.getDataList(), BATCH_SIZE));
-        flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent());
+        // 1、获取配置信息
+        final TableGroup tableGroup = cacheService.get(response.getTableGroupId(), TableGroup.class);
+        final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
+        final String targetTableName = tableGroup.getTargetTable().getName();
+        final Picker picker = new Picker(tableGroup.getFieldMapping());
+
+        // 2、批量执行同步
+        ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
+        Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, response.getEvent(),
+                picker.getTargetFields(), response.getDataList(), BATCH_SIZE));
+
+        // 3、持久化同步结果
+        flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
     }
 
     /**

+ 5 - 60
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/AbstractWriter.java

@@ -1,10 +1,5 @@
 package org.dbsyncer.parser.model;
 
-import org.dbsyncer.connector.model.Field;
-
-import java.util.List;
-import java.util.Map;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -12,66 +7,16 @@ import java.util.Map;
  */
 public abstract class AbstractWriter {
 
-    private String metaId;
-
-    private String targetConnectorId;
-
-    private String sourceTableName;
-
-    private String targetTableName;
-
-    private List<Field> fields;
-
-    private Map<String, String> command;
+    private String tableGroupId;
 
     private String event;
 
-    public String getMetaId() {
-        return metaId;
-    }
-
-    public void setMetaId(String metaId) {
-        this.metaId = metaId;
-    }
-
-    public String getTargetConnectorId() {
-        return targetConnectorId;
-    }
-
-    public void setTargetConnectorId(String targetConnectorId) {
-        this.targetConnectorId = targetConnectorId;
-    }
-
-    public String getSourceTableName() {
-        return sourceTableName;
-    }
-
-    public void setSourceTableName(String sourceTableName) {
-        this.sourceTableName = sourceTableName;
-    }
-
-    public String getTargetTableName() {
-        return targetTableName;
-    }
-
-    public void setTargetTableName(String targetTableName) {
-        this.targetTableName = targetTableName;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public void setFields(List<Field> fields) {
-        this.fields = fields;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
+    public String getTableGroupId() {
+        return tableGroupId;
     }
 
-    public void setCommand(Map<String, String> command) {
-        this.command = command;
+    public void setTableGroupId(String tableGroupId) {
+        this.tableGroupId = tableGroupId;
     }
 
     public String getEvent() {

+ 2 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -1,9 +1,7 @@
 package org.dbsyncer.parser.model;
 
-import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.parser.flush.BufferRequest;
 
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -13,26 +11,14 @@ import java.util.Map;
  */
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 
-    private String tableGroupId;
-
     private Map row;
 
-    public WriterRequest(String tableGroupId, Map row, String metaId, String targetConnectorId, String sourceTableName, String targetTableName, String event, List<Field> fields, Map<String, String> command) {
-        setMetaId(metaId);
-        setTargetConnectorId(targetConnectorId);
-        setSourceTableName(sourceTableName);
-        setTargetTableName(targetTableName);
+    public WriterRequest(String tableGroupId, String event, Map row) {
+        setTableGroupId(tableGroupId);
         setEvent(event);
-        setFields(fields);
-        setCommand(command);
-        this.tableGroupId = tableGroupId;
         this.row = row;
     }
 
-    public String getTableGroupId() {
-        return tableGroupId;
-    }
-
     public Map getRow() {
         return row;
     }

+ 2 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/ParserStrategy.java

@@ -1,11 +1,9 @@
 package org.dbsyncer.parser.strategy;
 
-import org.dbsyncer.common.event.RowChangedEvent;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.TableGroup;
+import java.util.Map;
 
 public interface ParserStrategy {
 
-    void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event);
+    void execute(String tableGroupId, String event, Map<String, Object> data);
 
 }

+ 4 - 23
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -34,18 +34,12 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
     @Autowired
     private BufferActuator writerBufferActuator;
 
-    @Autowired
-    private PluginFactory pluginFactory;
-
     @Autowired
     private CacheService cacheService;
 
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
+    public void execute(String tableGroupId, String event, Map<String, Object> data) {
         try {
-            EventEnum eventEnum = EventEnum.valueOf(event.getEvent());
-            Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, eventEnum.name()) ? event.getBefore() : event.getAfter();
-
             BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
             data.forEach((k, v) -> {
                 if (null != v) {
@@ -57,8 +51,8 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
             });
 
             BinlogMessage builder = BinlogMessage.newBuilder()
-                    .setTableGroupId(tableGroup.getId())
-                    .setEvent(eventEnum)
+                    .setTableGroupId(tableGroupId)
+                    .setEvent(EventEnum.valueOf(event))
                     .setData(dataBuilder.build())
                     .build();
             flush(builder);
@@ -91,10 +85,6 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
         // 1、获取配置信息
         final String tableGroupId = message.getTableGroupId();
         final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
-        final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
-        final String event = message.getEvent().name();
-        final String sourceTableName = tableGroup.getSourceTable().getName();
-        final String targetTableName = tableGroup.getTargetTable().getName();
 
         // 2、反序列数据
         final Picker picker = new Picker(tableGroup.getFieldMapping());
@@ -106,16 +96,7 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
             }
         });
 
-        // 3、获取目标源数据集合
-        Map target = picker.pickData(data);
-
-        // 4、参数转换
-        ConvertUtil.convert(tableGroup.getConvert(), target);
-
-        // 5、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
-
-        return new WriterRequest(tableGroupId, target, mapping.getMetaId(), mapping.getTargetConnectorId(), sourceTableName, targetTableName, event, picker.getTargetFields(), tableGroup.getCommand());
+        return new WriterRequest(message.getTableGroupId(), message.getEvent().name(), data);
     }
 
 }

+ 2 - 26
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java

@@ -1,16 +1,8 @@
 package org.dbsyncer.parser.strategy.impl;
 
-import org.dbsyncer.common.event.RowChangedEvent;
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.parser.flush.BufferActuator;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.Picker;
-import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.strategy.ParserStrategy;
-import org.dbsyncer.parser.util.ConvertUtil;
-import org.dbsyncer.plugin.PluginFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
@@ -21,28 +13,12 @@ import java.util.Map;
 @ConditionalOnProperty(value = "dbsyncer.parser.writer.buffer.actuator.enabled", havingValue = "true")
 public final class EnableWriterBufferActuatorStrategy implements ParserStrategy {
 
-    @Autowired
-    private PluginFactory pluginFactory;
-
     @Autowired
     private BufferActuator writerBufferActuator;
 
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
-        // 1、获取映射字段
-        final String eventName = event.getEvent();
-        Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, eventName) ? event.getBefore() : event.getAfter();
-        Picker picker = new Picker(tableGroup.getFieldMapping());
-        Map target = picker.pickData(data);
-
-        // 2、参数转换
-        ConvertUtil.convert(tableGroup.getConvert(), target);
-
-        // 3、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), eventName, data, target);
-
-        // 4、写入缓冲执行器
-        writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(), mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(), eventName, picker.getTargetFields(), tableGroup.getCommand()));
+    public void execute(String tableGroupId, String event, Map<String, Object> data) {
+        writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
     }
 
 }