Bladeren bron

rm not used

AE86 3 jaren geleden
bovenliggende
commit
f6766b1bba
21 gewijzigde bestanden met toevoegingen van 183 en 192 verwijderingen
  1. 16 7
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java
  2. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java
  3. 0 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java
  4. 0 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java
  5. 3 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  6. 1 1
      dbsyncer-listener/src/main/test/DBChangeNotificationTest.java
  7. 8 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  8. 7 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  9. 3 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  10. 1 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  11. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferRequest.java
  12. 3 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferResponse.java
  13. 3 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  14. 9 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  15. 6 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  16. 0 10
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractRequest.java
  17. 84 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractWriter.java
  18. 4 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java
  19. 3 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageResponse.java
  20. 10 45
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java
  21. 11 73
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterResponse.java

+ 16 - 7
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -18,7 +18,8 @@ import java.util.Map;
 public class RowChangedEvent {
 public class RowChangedEvent {
 
 
     private int tableGroupIndex;
     private int tableGroupIndex;
-    private String tableName;
+    private String sourceTableName;
+    private String targetTableName;
     private String event;
     private String event;
     private List<Object> beforeData;
     private List<Object> beforeData;
     private List<Object> afterData;
     private List<Object> afterData;
@@ -32,8 +33,8 @@ public class RowChangedEvent {
         this.after = after;
         this.after = after;
     }
     }
 
 
-    public RowChangedEvent(String tableName, String event, List<Object> beforeData, List<Object> afterData) {
-        this.tableName = tableName;
+    public RowChangedEvent(String sourceTableName, String event, List<Object> beforeData, List<Object> afterData) {
+        this.sourceTableName = sourceTableName;
         this.event = event;
         this.event = event;
         this.beforeData = beforeData;
         this.beforeData = beforeData;
         this.afterData = afterData;
         this.afterData = afterData;
@@ -43,12 +44,20 @@ public class RowChangedEvent {
         return tableGroupIndex;
         return tableGroupIndex;
     }
     }
 
 
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
+    public String getSourceTableName() {
+        return sourceTableName;
     }
     }
 
 
-    public String getTableName() {
-        return tableName;
+    public void setSourceTableName(String sourceTableName) {
+        this.sourceTableName = sourceTableName;
+    }
+
+    public String getTargetTableName() {
+        return targetTableName;
+    }
+
+    public void setTargetTableName(String targetTableName) {
+        this.targetTableName = targetTableName;
     }
     }
 
 
     public String getEvent() {
     public String getEvent() {

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java

@@ -3,7 +3,7 @@ package org.dbsyncer.connector.config;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
-public class WriterBatchConfig extends WriterConfig {
+public class WriterBatchConfig {
 
 
     /**
     /**
      * 表名
      * 表名

+ 0 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java

@@ -1,5 +0,0 @@
-package org.dbsyncer.connector.config;
-
-public class WriterConfig {
-
-}

+ 0 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java

@@ -1,5 +0,0 @@
-package org.dbsyncer.connector.config;
-
-public class WriterSingleConfig {
-
-}

+ 3 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -148,9 +148,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
                 })
                 })
             );
             );
         } catch (Exception e) {
         } catch (Exception e) {
-            result.getError().append("SQL:").append(executeSql).append(System.lineSeparator())
-                    .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
-            result.addFailData(data);
+            data.forEach(row -> forceUpdate(result, connectorMapper, config, pkField, row));
         }
         }
 
 
         if (null != execute) {
         if (null != execute) {
@@ -227,7 +225,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         if (!config.isForceUpdate()) {
         if (!config.isForceUpdate()) {
             result.getFailData().add(row);
             result.getFailData().add(row);
             result.getError().append("SQL:").append(config.getCommand().get(event)).append(System.lineSeparator())
             result.getError().append("SQL:").append(config.getCommand().get(event)).append(System.lineSeparator())
-                    .append("DATA:").append(row).append(System.lineSeparator());
+                    .append("DATA:").append(row).append(System.lineSeparator())
+                    .append("ERROR:").append("Row data does not exist.").append(System.lineSeparator());
             return;
             return;
         }
         }
 
 

+ 1 - 1
dbsyncer-listener/src/main/test/DBChangeNotificationTest.java

@@ -28,7 +28,7 @@ public class DBChangeNotificationTest {
 
 
         final DBChangeNotification dcn = new DBChangeNotification(username, password, url);
         final DBChangeNotification dcn = new DBChangeNotification(username, password, url);
         dcn.addRowEventListener((e) ->
         dcn.addRowEventListener((e) ->
-            logger.info("{}触发{}, before:{}, after:{}", e.getTableName(), e.getEvent(), e.getBeforeData(), e.getAfterData())
+            logger.info("{}触发{}, before:{}, after:{}", e.getSourceTableName(), e.getEvent(), e.getBeforeData(), e.getAfterData())
         );
         );
         dcn.start();
         dcn.start();
 
 

+ 8 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -31,6 +31,7 @@ import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
@@ -73,6 +74,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     @Autowired
     @Autowired
     private ConnectorFactory connectorFactory;
     private ConnectorFactory connectorFactory;
 
 
+    @Qualifier("taskExecutor")
     @Autowired
     @Autowired
     private Executor taskExecutor;
     private Executor taskExecutor;
 
 
@@ -240,10 +242,12 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         @Override
         @Override
         public void changedEvent(RowChangedEvent rowChangedEvent) {
         public void changedEvent(RowChangedEvent rowChangedEvent) {
             final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
             final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
-            rowChangedEvent.setTableName(picker.getTableGroup().getSourceTable().getName());
+            TableGroup tableGroup = picker.getTableGroup();
+            rowChangedEvent.setSourceTableName(tableGroup.getSourceTable().getName());
+            rowChangedEvent.setTargetTableName(tableGroup.getTargetTable().getName());
 
 
             // 处理过程有异常向上抛
             // 处理过程有异常向上抛
-            parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
+            parser.execute(mapping, tableGroup, rowChangedEvent);
 
 
             // 标记有变更记录
             // 标记有变更记录
             changed.compareAndSet(false, true);
             changed.compareAndSet(false, true);
@@ -296,7 +300,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         @Override
         @Override
         public void changedEvent(RowChangedEvent rowChangedEvent) {
         public void changedEvent(RowChangedEvent rowChangedEvent) {
             // 处理过程有异常向上抛
             // 处理过程有异常向上抛
-            List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getTableName());
+            List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
             if (!CollectionUtils.isEmpty(pickers)) {
                 pickers.parallelStream().forEach(picker -> {
                 pickers.parallelStream().forEach(picker -> {
                     final Map<String, Object> before = picker.getColumns(rowChangedEvent.getBeforeData());
                     final Map<String, Object> before = picker.getColumns(rowChangedEvent.getBeforeData());
@@ -304,6 +308,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                     if (picker.filter(StringUtil.equals(ConnectorConstant.OPERTION_DELETE, rowChangedEvent.getEvent()) ? before : after)) {
                     if (picker.filter(StringUtil.equals(ConnectorConstant.OPERTION_DELETE, rowChangedEvent.getEvent()) ? before : after)) {
                         rowChangedEvent.setBefore(before);
                         rowChangedEvent.setBefore(before);
                         rowChangedEvent.setAfter(after);
                         rowChangedEvent.setAfter(after);
+                        rowChangedEvent.setTargetTableName(picker.getTableGroup().getTargetTable().getName());
                         parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                         parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                     }
                     }
                 });
                 });

+ 7 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -287,13 +287,13 @@ public class ParserFactory implements Parser {
     }
     }
 
 
     @Override
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent) {
-        logger.info("Table[{}] {}, before:{}, after:{}", rowChangedEvent.getTableName(), rowChangedEvent.getEvent(),
-                rowChangedEvent.getBefore(), rowChangedEvent.getAfter());
+    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
+        logger.info("Table[{}] {}, before:{}, after:{}", event.getSourceTableName(), event.getEvent(),
+                event.getBefore(), event.getAfter());
 
 
         // 1、获取映射字段
         // 1、获取映射字段
-        final String event = rowChangedEvent.getEvent();
-        Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, event) ? rowChangedEvent.getBefore() : rowChangedEvent.getAfter();
+        final String eventName = event.getEvent();
+        Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, eventName) ? event.getBefore() : event.getAfter();
         Picker picker = new Picker(tableGroup.getFieldMapping(), data);
         Picker picker = new Picker(tableGroup.getFieldMapping(), data);
         Map target = picker.getTargetMap();
         Map target = picker.getTargetMap();
 
 
@@ -301,10 +301,10 @@ public class ParserFactory implements Parser {
         ConvertUtil.convert(tableGroup.getConvert(), target);
         ConvertUtil.convert(tableGroup.getConvert(), target);
 
 
         // 3、插件转换
         // 3、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
+        pluginFactory.convert(tableGroup.getPlugin(), eventName, data, target);
 
 
         // 4、写入缓冲执行器
         // 4、写入缓冲执行器
-        writerBufferActuator.offer(new WriterRequest(mapping.getMetaId(), mapping.getTargetConnectorId(), tableGroup.getId(), rowChangedEvent.getTableName(), event, picker.getTargetFields(), tableGroup.getCommand(), target));
+        writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(), mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(), eventName, picker.getTargetFields(), tableGroup.getCommand()));
     }
     }
 
 
     /**
     /**

+ 3 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -2,8 +2,6 @@ package org.dbsyncer.parser.flush;
 
 
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.parser.flush.model.AbstractRequest;
-import org.dbsyncer.parser.flush.model.AbstractResponse;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -57,7 +55,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
      *
      *
      * @return
      * @return
      */
      */
-    protected abstract AbstractResponse getValue();
+    protected abstract BufferResponse getValue();
 
 
     /**
     /**
      * 生成分区key
      * 生成分区key
@@ -83,7 +81,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     protected abstract void pull(Response response);
     protected abstract void pull(Response response);
 
 
     @Override
     @Override
-    public void offer(AbstractRequest request) {
+    public void offer(BufferRequest request) {
         if (running) {
         if (running) {
             temp.offer((Request) request);
             temp.offer((Request) request);
             return;
             return;
@@ -120,7 +118,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     private void flush(Queue<Request> queue) {
     private void flush(Queue<Request> queue) {
         if (!queue.isEmpty()) {
         if (!queue.isEmpty()) {
             AtomicLong batchCounter = new AtomicLong();
             AtomicLong batchCounter = new AtomicLong();
-            final Map<String, AbstractResponse> map = new LinkedHashMap<>();
+            final Map<String, BufferResponse> map = new LinkedHashMap<>();
             while (!queue.isEmpty() && batchCounter.get() < MAX_BATCH_COUNT) {
             while (!queue.isEmpty() && batchCounter.get() < MAX_BATCH_COUNT) {
                 Request poll = queue.poll();
                 Request poll = queue.poll();
                 String key = getPartitionKey(poll);
                 String key = getPartitionKey(poll);

+ 1 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.parser.flush;
 package org.dbsyncer.parser.flush;
 
 
-import org.dbsyncer.parser.flush.model.AbstractRequest;
-
 /**
 /**
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
@@ -9,6 +7,6 @@ import org.dbsyncer.parser.flush.model.AbstractRequest;
  */
  */
 public interface BufferActuator {
 public interface BufferActuator {
 
 
-    void offer(AbstractRequest request);
+    void offer(BufferRequest request);
 
 
 }
 }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferRequest.java

@@ -0,0 +1,10 @@
+package org.dbsyncer.parser.flush;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public interface BufferRequest {
+
+}

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractResponse.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferResponse.java

@@ -1,17 +1,17 @@
-package org.dbsyncer.parser.flush.model;
+package org.dbsyncer.parser.flush;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 18:11
  * @date 2022/3/27 18:11
  */
  */
-public abstract class AbstractResponse {
+public interface BufferResponse {
 
 
     /**
     /**
      * 获取批处理数
      * 获取批处理数
      *
      *
      * @return
      * @return
      */
      */
-    public abstract int getTaskSize();
+    int getTaskSize();
 
 
 }
 }

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONException;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.flush.FlushService;
-import org.dbsyncer.parser.flush.model.FlushRequest;
+import org.dbsyncer.parser.flush.model.StorageRequest;
 import org.dbsyncer.storage.SnowflakeIdWorker;
 import org.dbsyncer.storage.SnowflakeIdWorker;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -43,7 +43,7 @@ public class FlushServiceImpl implements FlushService {
     private SnowflakeIdWorker snowflakeIdWorker;
     private SnowflakeIdWorker snowflakeIdWorker;
 
 
     @Autowired
     @Autowired
-    private BufferActuator flushBufferActuator;
+    private BufferActuator storageBufferActuator;
 
 
     @Override
     @Override
     public void asyncWrite(String type, String error) {
     public void asyncWrite(String type, String error) {
@@ -76,7 +76,7 @@ public class FlushServiceImpl implements FlushService {
             return params;
             return params;
         }).collect(Collectors.toList());
         }).collect(Collectors.toList());
 
 
-        flushBufferActuator.offer(new FlushRequest(metaId, list));
+        storageBufferActuator.offer(new StorageRequest(metaId, list));
     }
     }
 
 
 }
 }

+ 9 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushBufferActuator.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,9 +1,9 @@
 package org.dbsyncer.parser.flush.impl;
 package org.dbsyncer.parser.flush.impl;
 
 
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
-import org.dbsyncer.parser.flush.model.AbstractResponse;
-import org.dbsyncer.parser.flush.model.FlushRequest;
-import org.dbsyncer.parser.flush.model.FlushResponse;
+import org.dbsyncer.parser.flush.BufferResponse;
+import org.dbsyncer.parser.flush.model.StorageRequest;
+import org.dbsyncer.parser.flush.model.StorageResponse;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
  * @date 2022/3/27 16:50
  * @date 2022/3/27 16:50
  */
  */
 @Component
 @Component
-public class FlushBufferActuator extends AbstractBufferActuator<FlushRequest, FlushResponse> {
+public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
 
 
     @Autowired
     @Autowired
     private StorageService storageService;
     private StorageService storageService;
@@ -26,23 +26,23 @@ public class FlushBufferActuator extends AbstractBufferActuator<FlushRequest, Fl
     }
     }
 
 
     @Override
     @Override
-    protected AbstractResponse getValue() {
-        return new FlushResponse();
+    protected BufferResponse getValue() {
+        return new StorageResponse();
     }
     }
 
 
     @Override
     @Override
-    protected String getPartitionKey(FlushRequest bufferTask) {
+    protected String getPartitionKey(StorageRequest bufferTask) {
         return bufferTask.getMetaId();
         return bufferTask.getMetaId();
     }
     }
 
 
     @Override
     @Override
-    protected void partition(FlushRequest request, FlushResponse response) {
+    protected void partition(StorageRequest request, StorageResponse response) {
         response.setMetaId(request.getMetaId());
         response.setMetaId(request.getMetaId());
         response.getDataList().addAll(request.getList());
         response.getDataList().addAll(request.getList());
     }
     }
 
 
     @Override
     @Override
-    protected void pull(FlushResponse response) {
+    protected void pull(StorageResponse response) {
         storageService.addData(StorageEnum.DATA, response.getMetaId(), response.getDataList());
         storageService.addData(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
     }
 }
 }

+ 6 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -7,8 +7,8 @@ import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
+import org.dbsyncer.parser.flush.BufferResponse;
 import org.dbsyncer.parser.flush.FlushStrategy;
 import org.dbsyncer.parser.flush.FlushStrategy;
-import org.dbsyncer.parser.flush.model.AbstractResponse;
 import org.dbsyncer.parser.flush.model.WriterRequest;
 import org.dbsyncer.parser.flush.model.WriterRequest;
 import org.dbsyncer.parser.flush.model.WriterResponse;
 import org.dbsyncer.parser.flush.model.WriterResponse;
 import org.dbsyncer.parser.model.BatchWriter;
 import org.dbsyncer.parser.model.BatchWriter;
@@ -47,7 +47,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     }
     }
 
 
     @Override
     @Override
-    protected AbstractResponse getValue() {
+    protected BufferResponse getValue() {
         return new WriterResponse();
         return new WriterResponse();
     }
     }
 
 
@@ -64,17 +64,18 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         }
         }
         response.setMetaId(request.getMetaId());
         response.setMetaId(request.getMetaId());
         response.setTargetConnectorId(request.getTargetConnectorId());
         response.setTargetConnectorId(request.getTargetConnectorId());
-        response.setCommand(request.getCommand());
-        response.setTableName(request.getTableName());
+        response.setSourceTableName(request.getSourceTableName());
+        response.setTargetTableName(request.getTargetTableName());
         response.setEvent(request.getEvent());
         response.setEvent(request.getEvent());
         response.setFields(Collections.unmodifiableList(request.getFields()));
         response.setFields(Collections.unmodifiableList(request.getFields()));
+        response.setCommand(request.getCommand());
         response.setMerged(true);
         response.setMerged(true);
     }
     }
 
 
     @Override
     @Override
     protected void pull(WriterResponse response) {
     protected void pull(WriterResponse response) {
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(response.getTargetConnectorId()));
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(response.getTargetConnectorId()));
-        Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, response.getCommand(), response.getTableName(), response.getEvent(),
+        Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, response.getCommand(), response.getTargetTableName(), response.getEvent(),
                 response.getFields(), response.getDataList(), BATCH_SIZE, true));
                 response.getFields(), response.getDataList(), BATCH_SIZE, true));
         flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent());
         flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent());
     }
     }

+ 0 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractRequest.java

@@ -1,10 +0,0 @@
-package org.dbsyncer.parser.flush.model;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/3/27 16:57
- */
-public abstract class AbstractRequest {
-
-}

+ 84 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractWriter.java

@@ -0,0 +1,84 @@
+package org.dbsyncer.parser.flush.model;
+
+import org.dbsyncer.connector.config.Field;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/4 23:02
+ */
+public abstract class AbstractWriter {
+
+    private String metaId;
+
+    private String targetConnectorId;
+
+    private String sourceTableName;
+
+    private String targetTableName;
+
+    private List<Field> fields;
+
+    private Map<String, String> command;
+
+    private String event;
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
+    }
+
+    public String getTargetConnectorId() {
+        return targetConnectorId;
+    }
+
+    public void setTargetConnectorId(String targetConnectorId) {
+        this.targetConnectorId = targetConnectorId;
+    }
+
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    public void setSourceTableName(String sourceTableName) {
+        this.sourceTableName = sourceTableName;
+    }
+
+    public String getTargetTableName() {
+        return targetTableName;
+    }
+
+    public void setTargetTableName(String targetTableName) {
+        this.targetTableName = targetTableName;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public void setCommand(Map<String, String> command) {
+        this.command = command;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+}

+ 4 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushRequest.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.flush.model;
 package org.dbsyncer.parser.flush.model;
 
 
+import org.dbsyncer.parser.flush.BufferRequest;
+
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -8,13 +10,13 @@ import java.util.Map;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 16:57
  * @date 2022/3/27 16:57
  */
  */
-public class FlushRequest extends AbstractRequest {
+public class StorageRequest implements BufferRequest {
 
 
     private String metaId;
     private String metaId;
 
 
     private List<Map> list;
     private List<Map> list;
 
 
-    public FlushRequest(String metaId, List<Map> list) {
+    public StorageRequest(String metaId, List<Map> list) {
         this.metaId = metaId;
         this.metaId = metaId;
         this.list = list;
         this.list = list;
     }
     }

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushResponse.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageResponse.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.flush.model;
 package org.dbsyncer.parser.flush.model;
 
 
+import org.dbsyncer.parser.flush.BufferResponse;
+
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -9,7 +11,7 @@ import java.util.Map;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 16:57
  * @date 2022/3/27 16:57
  */
  */
-public class FlushResponse extends AbstractResponse {
+public class StorageResponse implements BufferResponse {
 
 
     private String metaId;
     private String metaId;
     private List<Map> dataList = new LinkedList<>();
     private List<Map> dataList = new LinkedList<>();

+ 10 - 45
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush.model;
 package org.dbsyncer.parser.flush.model;
 
 
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.flush.BufferRequest;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -10,64 +11,28 @@ import java.util.Map;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 16:57
  * @date 2022/3/27 16:57
  */
  */
-public class WriterRequest extends AbstractRequest {
-
-    private String metaId;
-
-    private String targetConnectorId;
+public class WriterRequest extends AbstractWriter implements BufferRequest {
 
 
     private String tableGroupId;
     private String tableGroupId;
 
 
-    private String tableName;
-
-    private String event;
-
-    private List<Field> fields;
-
-    private Map<String, String> command;
-
     private Map row;
     private Map row;
 
 
-    public WriterRequest(String metaId, String targetConnectorId, String tableGroupId, String tableName, String event, List<Field> fields,
-                         Map<String, String> command, Map row) {
-        this.metaId = metaId;
-        this.targetConnectorId = targetConnectorId;
+    public WriterRequest(String tableGroupId, Map row, String metaId, String targetConnectorId, String sourceTableName, String targetTableName, String event, List<Field> fields, Map<String, String> command) {
+        setMetaId(metaId);
+        setTargetConnectorId(targetConnectorId);
+        setSourceTableName(sourceTableName);
+        setTargetTableName(targetTableName);
+        setEvent(event);
+        setFields(fields);
+        setCommand(command);
         this.tableGroupId = tableGroupId;
         this.tableGroupId = tableGroupId;
-        this.tableName = tableName;
-        this.event = event;
-        this.fields = fields;
-        this.command = command;
         this.row = row;
         this.row = row;
     }
     }
 
 
-    public String getMetaId() {
-        return metaId;
-    }
-
-    public String getTargetConnectorId() {
-        return targetConnectorId;
-    }
-
     public String getTableGroupId() {
     public String getTableGroupId() {
         return tableGroupId;
         return tableGroupId;
     }
     }
 
 
-    public String getTableName() {
-        return tableName;
-    }
-
-    public String getEvent() {
-        return event;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
-
     public Map getRow() {
     public Map getRow() {
         return row;
         return row;
     }
     }

+ 11 - 73
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterResponse.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.parser.flush.model;
 package org.dbsyncer.parser.flush.model;
 
 
-import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.flush.BufferResponse;
 
 
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
@@ -12,78 +11,15 @@ import java.util.Map;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 18:11
  * @date 2022/3/27 18:11
  */
  */
-public class WriterResponse extends AbstractResponse {
-
-    private boolean isMerged;
-
-    private String metaId;
-
-    private String targetConnectorId;
-
-    private String tableName;
-
-    private String event;
-
-    private List<Field> fields;
-
-    private Map<String, String> command;
+public class WriterResponse extends AbstractWriter implements BufferResponse {
 
 
     private List<Map> dataList = new LinkedList<>();
     private List<Map> dataList = new LinkedList<>();
 
 
-    public boolean isMerged() {
-        return isMerged;
-    }
-
-    public void setMerged(boolean merged) {
-        isMerged = merged;
-    }
-
-    public String getMetaId() {
-        return metaId;
-    }
-
-    public void setMetaId(String metaId) {
-        this.metaId = metaId;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public String getEvent() {
-        return event;
-    }
-
-    public void setEvent(String event) {
-        this.event = event;
-    }
-
-    public String getTargetConnectorId() {
-        return targetConnectorId;
-    }
-
-    public void setTargetConnectorId(String targetConnectorId) {
-        this.targetConnectorId = targetConnectorId;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public void setFields(List<Field> fields) {
-        this.fields = fields;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
+    private boolean isMerged;
 
 
-    public void setCommand(Map<String, String> command) {
-        this.command = command;
+    @Override
+    public int getTaskSize() {
+        return dataList.size();
     }
     }
 
 
     public List<Map> getDataList() {
     public List<Map> getDataList() {
@@ -94,9 +30,11 @@ public class WriterResponse extends AbstractResponse {
         this.dataList = dataList;
         this.dataList = dataList;
     }
     }
 
 
-    @Override
-    public int getTaskSize() {
-        return dataList.size();
+    public boolean isMerged() {
+        return isMerged;
     }
     }
 
 
+    public void setMerged(boolean merged) {
+        isMerged = merged;
+    }
 }
 }