AE86 3 years ago
parent
commit
5988abadd8

+ 5 - 13
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -24,21 +24,18 @@ public class RowChangedEvent {
     private List<Object> afterData;
     private Map<String, Object> before;
     private Map<String, Object> after;
-    private boolean forceUpdate;
+    private boolean isForceUpdate;
 
     public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
-        this.tableGroupIndex = tableGroupIndex;
-        this.event = event;
-        this.before = before;
-        this.after = after;
+        this(tableGroupIndex, event, before, after, false);
     }
 
-    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after, boolean forceUpdate) {
+    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after, boolean isForceUpdate) {
         this.tableGroupIndex = tableGroupIndex;
         this.event = event;
         this.before = before;
         this.after = after;
-        this.forceUpdate = forceUpdate;
+        this.isForceUpdate = isForceUpdate;
     }
 
     public RowChangedEvent(String tableName, String event, List<Object> beforeData, List<Object> afterData) {
@@ -89,12 +86,7 @@ public class RowChangedEvent {
     }
 
     public boolean isForceUpdate() {
-        return forceUpdate;
-    }
-
-    public RowChangedEvent setForceUpdate(boolean forceUpdate) {
-        this.forceUpdate = forceUpdate;
-        return this;
+        return isForceUpdate;
     }
 
     @Override

+ 38 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java

@@ -5,24 +5,56 @@ import java.util.Map;
 
 public class WriterBatchConfig extends WriterConfig {
 
+    /**
+     * 事件
+     */
+    private String event;
+    /**
+     * 执行命令
+     */
+    private Map<String, String> command;
+    /**
+     * 字段信息
+     */
+    private List<Field> fields;
     /**
      * 集合数据
      */
     private List<Map> data;
+    /**
+     * 强制更新
+     */
+    private boolean isForceUpdate;
 
     public WriterBatchConfig(String event, Map<String, String> command, List<Field> fields, List<Map> data) {
-        setEvent(event);
-        setCommand(command);
-        setFields(fields);
+        this(event, command, fields, data, false);
+    }
+
+    public WriterBatchConfig(String event, Map<String, String> command, List<Field> fields, List<Map> data, boolean isForceUpdate) {
+        this.event = event;
+        this.command = command;
+        this.fields = fields;
         this.data = data;
+        this.isForceUpdate = isForceUpdate;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public List<Field> getFields() {
+        return fields;
     }
 
     public List<Map> getData() {
         return data;
     }
 
-    public WriterBatchConfig setData(List<Map> data) {
-        this.data = data;
-        return this;
+    public boolean isForceUpdate() {
+        return isForceUpdate;
     }
 }

+ 0 - 42
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java

@@ -1,47 +1,5 @@
 package org.dbsyncer.connector.config;
 
-import java.util.List;
-import java.util.Map;
-
 public class WriterConfig {
 
-    /**
-     * 事件
-     */
-    private String event;
-    /**
-     * 执行命令
-     */
-    private Map<String, String> command;
-    /**
-     * 字段信息
-     */
-    private List<Field> fields;
-
-    public String getEvent() {
-        return event;
-    }
-
-    public void setEvent(String event) {
-        this.event = event;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
-
-    public WriterConfig setCommand(Map<String, String> command) {
-        this.command = command;
-        return this;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public WriterConfig setFields(List<Field> fields) {
-        this.fields = fields;
-        return this;
-    }
-
 }

+ 1 - 67
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java

@@ -1,71 +1,5 @@
 package org.dbsyncer.connector.config;
 
-import java.util.List;
-import java.util.Map;
+public class WriterSingleConfig {
 
-public class WriterSingleConfig extends WriterConfig {
-
-    /**
-     * 行数据
-     */
-    private Map<String, Object> data;
-
-    /**
-     * 表名
-     */
-    private String table;
-
-    /**
-     * 重试标记
-     */
-    private boolean retry;
-
-    /**
-     * 更新失败转插入
-     */
-    private boolean forceUpdate;
-
-    public WriterSingleConfig(List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table, boolean forceUpdate) {
-        setEvent(event);
-        setCommand(command);
-        setFields(fields);
-        this.data = data;
-        this.table = table;
-        this.forceUpdate = forceUpdate;
-    }
-
-    public Map<String, Object> getData() {
-        return data;
-    }
-
-    public WriterSingleConfig setData(Map<String, Object> data) {
-        this.data = data;
-        return this;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public WriterSingleConfig setTable(String table) {
-        this.table = table;
-        return this;
-    }
-
-    public boolean isRetry() {
-        return retry;
-    }
-
-    public void setRetry(boolean retry) {
-        this.retry = retry;
-    }
-
-    public boolean isForceUpdate() {
-        return forceUpdate;
-    }
-
-    public WriterSingleConfig setForceUpdate(boolean forceUpdate) {
-        this.forceUpdate = forceUpdate;
-        return this;
-    }
 }

+ 69 - 73
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -26,7 +26,8 @@ import java.sql.*;
 import java.util.*;
 import java.util.stream.Collectors;
 
-public abstract class AbstractDatabaseConnector extends AbstractConnector implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {
+public abstract class AbstractDatabaseConnector extends AbstractConnector
+        implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -71,7 +72,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
         String quotation = buildSqlWithQuotation();
-        StringBuilder queryMetaSql = new StringBuilder("SELECT * FROM ").append(quotation).append(tableName).append(quotation).append(" WHERE 1 != 1");
+        StringBuilder queryMetaSql = new StringBuilder("SELECT * FROM ").append(quotation).append(tableName).append(quotation).append(
+                " WHERE 1 != 1");
         return connectorMapper.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql.toString(), tableName));
     }
 
@@ -95,7 +97,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         Collections.addAll(config.getArgs(), getPageArgs(config.getPageIndex(), config.getPageSize()));
 
         // 3、执行SQL
-        List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
+        List<Map<String, Object>> list = connectorMapper.execute(
+                databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
 
         // 4、返回结果集
         return new Result(list);
@@ -150,16 +153,14 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         } catch (Exception e) {
             result.getError().append("SQL:").append(executeSql).append(System.lineSeparator())
                     .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
-            logger.error(result.getError().toString());
+            result.addFailData(data);
         }
 
         if (null != execute) {
             int batchSize = execute.length;
             for (int i = 0; i < batchSize; i++) {
                 if (execute[i] == 0) {
-                    result.getFailData().add(data.get(i));
-                    result.getError().append("SQL:").append(executeSql).append(System.lineSeparator())
-                            .append("DATA:").append(data.get(i)).append(System.lineSeparator());
+                    forceUpdate(result, connectorMapper, config, pkField, data.get(i));
                     continue;
                 }
                 result.getSuccessData().add(data.get(i));
@@ -168,72 +169,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return result;
     }
 
-    public Result writer(DatabaseConnectorMapper connectorMapper, WriterSingleConfig config) {
-        String event = config.getEvent();
-        List<Field> fields = config.getFields();
-        Map<String, Object> data = config.getData();
-        // 1、获取 SQL
-        String sql = config.getCommand().get(event);
-        Assert.hasText(sql, "执行语句不能为空.");
-        if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(fields)) {
-            logger.error("writer data can not be empty.");
-            throw new ConnectorException("writer data can not be empty.");
-        }
-
-        Field pkField = getPrimaryKeyField(fields);
-        // Update / Delete
-        if (!isInsert(event)) {
-            if (isDelete(event)) {
-                fields.clear();
-            }
-            fields.add(pkField);
-        }
-
-        int size = fields.size();
-        Result result = new Result();
-        int execute = 0;
-        try {
-            // 2、设置参数
-            execute = connectorMapper.execute(databaseTemplate ->
-                    databaseTemplate.update(sql, (ps) ->
-                            batchRowsSetter(databaseTemplate.getConnection(), ps, fields, size, data)
-                    )
-            );
-        } catch (Exception e) {
-            // 记录错误数据
-            if (!config.isForceUpdate()) {
-                result.getFailData().add(data);
-                result.getError().append("SQL:").append(sql).append(System.lineSeparator())
-                        .append("DATA:").append(data).append(System.lineSeparator())
-                        .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
-                logger.error(result.getError().toString());
-            }
-        }
-
-        if (0 == execute && !config.isRetry() && null != pkField) {
-            // 不存在转insert
-            if (isUpdate(event)) {
-                String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
-                if (!existRow(connectorMapper, queryCount, data.get(pkField.getName()))) {
-                    fields.remove(fields.size() - 1);
-                    config.setEvent(ConnectorConstant.OPERTION_INSERT);
-                    config.setRetry(true);
-                    logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTable(), event, config.getEvent(), data);
-                    return writer(connectorMapper, config);
-                }
-                return result;
-            }
-            // 存在转update
-            if (isInsert(event)) {
-                config.setEvent(ConnectorConstant.OPERTION_UPDATE);
-                config.setRetry(true);
-                return writer(connectorMapper, config);
-            }
-
-        }
-        return result;
-    }
-
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
         // 获取过滤SQL
@@ -289,6 +224,67 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return map;
     }
 
+    private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
+                             Map row) {
+        String event = config.getEvent();
+        if (!config.isForceUpdate()) {
+            result.getFailData().add(row);
+            result.getError().append("SQL:").append(config.getCommand().get(event)).append(System.lineSeparator())
+                    .append("DATA:").append(row).append(System.lineSeparator());
+        }
+
+        // 不存在转insert
+        if (isUpdate(event)) {
+            String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
+            if (!existRow(connectorMapper, queryCount, row.get(pkField.getName()))) {
+                logger.warn("{}表执行{}失败, 尝试执行{}, {}", "", event, ConnectorConstant.OPERTION_INSERT, row);
+                writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_INSERT);
+            }
+            return;
+        }
+
+        // 存在转update
+        if (isInsert(config.getEvent())) {
+            logger.warn("{}表执行{}失败, 尝试执行{}, {}", "", event, ConnectorConstant.OPERTION_UPDATE, row);
+            writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_UPDATE);
+        }
+    }
+
+    private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField, Map row,
+                        String event) {
+        // 1、获取 SQL
+        String sql = config.getCommand().get(event);
+
+        List<Field> fields = new ArrayList<>(config.getFields());
+        // Update / Delete
+        if (!isInsert(event)) {
+            if (isDelete(event)) {
+                fields.clear();
+            }
+            fields.add(pkField);
+        }
+
+        int size = fields.size();
+        try {
+            // 2、设置参数
+            int execute = connectorMapper.execute(databaseTemplate ->
+                    databaseTemplate.update(sql, (ps) ->
+                            batchRowsSetter(databaseTemplate.getConnection(), ps, fields, size, row)
+                    )
+            );
+            if (execute == 0) {
+                throw new ConnectorException(String.format("retry %s error", event));
+            }
+        } catch (Exception e) {
+            // 记录错误数据
+            result.getFailData().add(row);
+            result.getError().append("SQL:").append(sql).append(System.lineSeparator())
+                    .append("DATA:").append(row).append(System.lineSeparator())
+                    .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
+            logger.error(result.getError().toString());
+        }
+    }
+
     /**
      * 获取DQL表信息
      *

+ 3 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -2,8 +2,6 @@ package org.dbsyncer.parser;
 
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
@@ -13,9 +11,7 @@ import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 
 import java.util.List;
@@ -167,14 +163,9 @@ public interface Parser {
     /**
      * 批执行
      *
-     * @param connectorMapper
-     * @param command
-     * @param event
-     * @param fields
-     * @param dataList
-     * @param batchSize
+     * @param batchWriter
      * @return
      */
-    Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize);
+    Result writeBatch(BatchWriter batchWriter);
 
 }

+ 12 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -276,7 +276,7 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            Result writer = writeBatch(tConnectorMapper, command, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
+            Result writer = writeBatch(new BatchWriter(tConnectorMapper, command, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize));
 
             // 6、更新结果
             flush(task, writer);
@@ -306,26 +306,27 @@ public class ParserFactory implements Parser {
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
         // 4、写入缓冲执行器
-        writerBufferActuator.offer(new WriterRequest(metaId, tableGroup.getId(), event, tConnectorMapper, picker.getTargetFields(), tableGroup.getCommand(), target));
+        writerBufferActuator.offer(new WriterRequest(metaId, tableGroup.getId(), event, tConnectorMapper, picker.getTargetFields(), tableGroup.getCommand(), target, rowChangedEvent.isForceUpdate()));
     }
 
     /**
      * 批量写入
      *
-     * @param connectorMapper
-     * @param command
-     * @param fields
-     * @param dataList
-     * @param batchSize
+     * @param batchWriter
      * @return
      */
     @Override
-    public Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize) {
+    public Result writeBatch(BatchWriter batchWriter) {
+        List<Map> dataList = batchWriter.getDataList();
+        int batchSize = batchWriter.getBatchSize();
+        String event = batchWriter.getEvent();
+        Map<String, String> command = batchWriter.getCommand();
+        List<Field> fields = batchWriter.getFields();
         // 总数
         int total = dataList.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, dataList));
+            return connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(event, command, fields, dataList, batchWriter.isForceUpdate()));
         }
 
         // 批量任务, 拆分
@@ -348,7 +349,8 @@ public class ParserFactory implements Parser {
 
             taskExecutor.execute(() -> {
                 try {
-                    Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, data));
+                    Result w = connectorFactory.writer(batchWriter.getConnectorMapper(),
+                            new WriterBatchConfig(event, command, fields, data, batchWriter.isForceUpdate()));
                     result.addSuccessData(w.getSuccessData());
                     result.addFailData(w.getFailData());
                     result.getError().append(w.getError());

+ 4 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -7,6 +7,7 @@ import org.dbsyncer.parser.flush.FlushStrategy;
 import org.dbsyncer.parser.flush.model.AbstractResponse;
 import org.dbsyncer.parser.flush.model.WriterRequest;
 import org.dbsyncer.parser.flush.model.WriterResponse;
+import org.dbsyncer.parser.model.BatchWriter;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -54,13 +55,14 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         response.setConnectorMapper(request.getConnectorMapper());
         response.setFields(Collections.unmodifiableList(request.getFields()));
         response.setCommand(request.getCommand());
+        response.setForceUpdate(request.isForceUpdate());
         response.setMerged(true);
     }
 
     @Override
     protected void pull(WriterResponse response) {
-        Result result = parserFactory.writeBatch(response.getConnectorMapper(), response.getCommand(), response.getEvent(),
-                response.getFields(), response.getDataList(), BATCH_SIZE);
+        Result result = parserFactory.writeBatch(new BatchWriter(response.getConnectorMapper(), response.getCommand(), response.getEvent(),
+                response.getFields(), response.getDataList(), BATCH_SIZE, response.isForceUpdate()));
         flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent());
     }
 }

+ 8 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java

@@ -27,7 +27,9 @@ public class WriterRequest extends AbstractRequest {
 
     private Map row;
 
-    public WriterRequest(String metaId, String tableGroupId, String event, ConnectorMapper connectorMapper, List<Field> fields, Map<String, String> command, Map row) {
+    private boolean isForceUpdate;
+
+    public WriterRequest(String metaId, String tableGroupId, String event, ConnectorMapper connectorMapper, List<Field> fields, Map<String, String> command, Map row, boolean isForceUpdate) {
         this.metaId = metaId;
         this.tableGroupId = tableGroupId;
         this.event = event;
@@ -35,6 +37,7 @@ public class WriterRequest extends AbstractRequest {
         this.fields = fields;
         this.command = command;
         this.row = row;
+        this.isForceUpdate = isForceUpdate;
     }
 
     public String getMetaId() {
@@ -64,4 +67,8 @@ public class WriterRequest extends AbstractRequest {
     public Map getRow() {
         return row;
     }
+
+    public boolean isForceUpdate() {
+        return isForceUpdate;
+    }
 }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterResponse.java

@@ -28,6 +28,8 @@ public class WriterResponse extends AbstractResponse {
 
     private List<Map> dataList = new LinkedList<>();
 
+    private boolean isForceUpdate;
+
     public boolean isMerged() {
         return isMerged;
     }
@@ -84,6 +86,14 @@ public class WriterResponse extends AbstractResponse {
         this.dataList = dataList;
     }
 
+    public boolean isForceUpdate() {
+        return isForceUpdate;
+    }
+
+    public void setForceUpdate(boolean forceUpdate) {
+        isForceUpdate = forceUpdate;
+    }
+
     @Override
     public int getTaskSize() {
         return dataList.size();

+ 62 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java

@@ -0,0 +1,62 @@
+package org.dbsyncer.parser.model;
+
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.Field;
+
+import java.util.List;
+import java.util.Map;
+
+public final class BatchWriter {
+
+    private ConnectorMapper     connectorMapper;
+    private Map<String, String> command;
+    private String              event;
+    private List<Field>         fields;
+    private List<Map>           dataList;
+    private int                 batchSize;
+    private boolean             isForceUpdate;
+
+    public BatchWriter(ConnectorMapper connectorMapper, Map<String, String> command, String event,
+                       List<Field> fields, List<Map> dataList, int batchSize) {
+        this(connectorMapper, command, event, fields, dataList, batchSize, false);
+    }
+
+    public BatchWriter(ConnectorMapper connectorMapper, Map<String, String> command, String event,
+                       List<Field> fields, List<Map> dataList, int batchSize, boolean isForceUpdate) {
+        this.connectorMapper = connectorMapper;
+        this.command = command;
+        this.event = event;
+        this.fields = fields;
+        this.dataList = dataList;
+        this.batchSize = batchSize;
+        this.isForceUpdate = isForceUpdate;
+    }
+
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public List<Map> getDataList() {
+        return dataList;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public boolean isForceUpdate() {
+        return isForceUpdate;
+    }
+}