Browse Source

!108 merge
Merge pull request !108 from AE86/V_1.0.0_RC

AE86 2 years ago
parent
commit
5df74625f5

+ 29 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/model/AbstractConvertContext.java

@@ -25,6 +25,11 @@ public abstract class AbstractConvertContext implements ConvertContext {
      */
     protected ProxyApplicationContext context;
 
+    /**
+     * 数据源连接实例
+     */
+    protected ConnectorMapper sourceConnectorMapper;
+
     /**
      * 目标源连接实例
      */
@@ -55,6 +60,17 @@ public abstract class AbstractConvertContext implements ConvertContext {
      */
     protected List<Map> targetList;
 
+    public void init(ConnectorMapper sourceConnectorMapper, ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
+                              List<Map> sourceList, List<Map> targetList) {
+        this.sourceConnectorMapper = sourceConnectorMapper;
+        this.targetConnectorMapper = targetConnectorMapper;
+        this.sourceTableName = sourceTableName;
+        this.targetTableName = targetTableName;
+        this.event = event;
+        this.sourceList = sourceList;
+        this.targetList = targetList;
+    }
+
     public void setContext(ProxyApplicationContext context) {
         this.context = context;
     }
@@ -74,6 +90,11 @@ public abstract class AbstractConvertContext implements ConvertContext {
         return context;
     }
 
+    @Override
+    public ConnectorMapper getSourceConnectorMapper() {
+        return sourceConnectorMapper;
+    }
+
     @Override
     public ConnectorMapper getTargetConnectorMapper() {
         return targetConnectorMapper;
@@ -99,8 +120,16 @@ public abstract class AbstractConvertContext implements ConvertContext {
         return sourceList;
     }
 
+    public void setSourceList(List<Map> sourceList) {
+        this.sourceList = sourceList;
+    }
+
     @Override
     public List<Map> getTargetList() {
         return targetList;
     }
+
+    public void setTargetList(List<Map> targetList) {
+        this.targetList = targetList;
+    }
 }

+ 2 - 12
dbsyncer-common/src/main/java/org/dbsyncer/common/model/FullConvertContext.java

@@ -2,9 +2,6 @@ package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -12,14 +9,7 @@ import java.util.Map;
  */
 public final class FullConvertContext extends AbstractConvertContext {
 
-    public FullConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
-                              List<Map> sourceList, List<Map> targetList) {
-        this.targetConnectorMapper = targetConnectorMapper;
-        this.sourceTableName = sourceTableName;
-        this.targetTableName = targetTableName;
-        this.event = event;
-        this.sourceList = sourceList;
-        this.targetList = targetList;
+    public FullConvertContext(ConnectorMapper sourceConnectorMapper, ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event) {
+        super.init(sourceConnectorMapper, targetConnectorMapper, sourceTableName, targetTableName, event, null, null);
     }
-
 }

+ 2 - 8
dbsyncer-common/src/main/java/org/dbsyncer/common/model/IncrementConvertContext.java

@@ -12,13 +12,7 @@ import java.util.Map;
  */
 public final class IncrementConvertContext extends AbstractConvertContext {
 
-    public IncrementConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
-                                   List<Map> sourceList, List<Map> targetList) {
-        this.targetConnectorMapper = targetConnectorMapper;
-        this.sourceTableName = sourceTableName;
-        this.targetTableName = targetTableName;
-        this.event = event;
-        this.sourceList = sourceList;
-        this.targetList = targetList;
+    public IncrementConvertContext(ConnectorMapper sourceConnectorMapper, ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
+        super.init(sourceConnectorMapper, targetConnectorMapper, sourceTableName, targetTableName, event, sourceList, targetList);
     }
 }

+ 5 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertContext.java

@@ -32,6 +32,11 @@ public interface ConvertContext {
      */
     ProxyApplicationContext getContext();
 
+    /**
+     * 数据源连接实例
+     */
+    ConnectorMapper getSourceConnectorMapper();
+
     /**
      * 目标源连接实例
      */

+ 9 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java

@@ -31,6 +31,15 @@ public class Filter {
      */
     private String value;
 
+    public Filter() {
+    }
+
+    public Filter(String name, FilterEnum filterEnum, Object value) {
+        this.name = name;
+        this.filter = filterEnum.getName();
+        this.value = String.valueOf(value);
+    }
+
     public String getName() {
         return name;
     }

+ 14 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -252,9 +252,10 @@ public class ParserFactory implements Parser {
 
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
-        ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
-        ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
+        final ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
+        final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
         final String event = ConnectorConstant.OPERTION_INSERT;
+        final FullConvertContext context = new FullConvertContext(sConnectorMapper, tConnectorMapper, sTableName, tTableName, event);
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -264,20 +265,21 @@ public class ParserFactory implements Parser {
 
             // 1、获取数据源数据
             Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursor(), task.getPageIndex(), pageSize));
-            List<Map> data = reader.getSuccessData();
-            if (CollectionUtils.isEmpty(data)) {
+            List<Map> source = reader.getSuccessData();
+            if (CollectionUtils.isEmpty(source)) {
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
 
             // 2、映射字段
-            List<Map> target = picker.pickData(data);
+            List<Map> target = picker.pickData(source);
 
             // 3、参数转换
             ConvertUtil.convert(group.getConvert(), target);
 
             // 4、插件转换
-            final FullConvertContext context = new FullConvertContext(tConnectorMapper, sTableName, tTableName, event, data, target);
+            context.setSourceList(source);
+            context.setTargetList(target);
             pluginFactory.convert(group.getPlugin(), context);
 
             // 5、写入目标源
@@ -289,13 +291,13 @@ public class ParserFactory implements Parser {
 
             // 7、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
-            task.setCursor(getLastCursor(data, pk));
+            task.setCursor(getLastCursor(source, pk));
             result.setTableGroupId(tableGroup.getId());
             result.setTargetTableGroupName(tTableName);
             flush(task, result);
 
             // 8、判断尾页
-            if (data.size() < pageSize) {
+            if (source.size() < pageSize) {
                 logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
@@ -331,7 +333,7 @@ public class ParserFactory implements Parser {
     private Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor taskExecutor) {
         final Result result = new Result();
         // 终止同步数据到目标源库
-        if(context.isTerminated()){
+        if (context.isTerminated()) {
             result.getSuccessData().addAll(batchWriter.getDataList());
             return result;
         }
@@ -427,12 +429,12 @@ public class ParserFactory implements Parser {
     /**
      * 获取最新游标值
      *
-     * @param data
+     * @param source
      * @param pk
      * @return
      */
-    private Object getLastCursor(List<Map> data, String pk) {
-        return CollectionUtils.isEmpty(data) ? null : data.get(data.size() - 1).get(pk);
+    private Object getLastCursor(List<Map> source, String pk) {
+        return CollectionUtils.isEmpty(source) ? null : source.get(source.size() - 1).get(pk);
     }
 
 }

+ 4 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -96,12 +96,13 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         ConvertUtil.convert(group.getConvert(), targetDataList);
 
         // 4、插件转换
-        ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
-        final IncrementConvertContext context = new IncrementConvertContext(targetConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
+        final ConnectorMapper sConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getSourceConnectorId()));
+        final ConnectorMapper tConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
+        final IncrementConvertContext context = new IncrementConvertContext(sConnectorMapper, tConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         pluginFactory.convert(group.getPlugin(), context);
 
         // 5、批量执行同步
-        Result result = parserFactory.writeBatch(context, new BatchWriter(targetConnectorMapper, group.getCommand(), targetTableName, event,
+        Result result = parserFactory.writeBatch(context, new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event,
                 picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount()));
 
         // 6、持久化同步结果

+ 14 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java

@@ -7,18 +7,24 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.query.BooleanQuery;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import javax.annotation.PostConstruct;
+import java.sql.Timestamp;
 import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
@@ -151,7 +157,14 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
             //  TODO 查询[待处理] 或 [处理中 & 处理超时]
             Query query = new Query();
             query.setType(StorageEnum.BINLOG);
-            query.addFilter(ConfigConstant.BINLOG_STATUS, String.valueOf(BinlogConstant.READY));
+            Filter ready = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.READY);
+            Filter processing = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.PROCESSING);
+            long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
+            Filter processingTimeout = new Filter(ConfigConstant.CONFIG_MODEL_CREATE_TIME, FilterEnum.LT, maxProcessingSeconds);
+            BooleanQuery booleanQuery = new BooleanQuery()
+                    .add(new BooleanQuery().add(ready), OperationEnum.OR)
+                    .add(new BooleanQuery().add(processing).add(processingTimeout));
+            query.setBooleanQuery(booleanQuery);
             query.setPageNum(1);
             query.setPageSize(binlogRecorderConfig.getBatchCount());
             Paging paging = storageService.query(query);

+ 51 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanQuery.java

@@ -0,0 +1,51 @@
+package org.dbsyncer.storage.query;
+
+
+import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.connector.model.Filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BooleanQuery {
+
+    private final List<BooleanQuery> clauses = new ArrayList<>();
+
+    private final List<Filter> filters = new ArrayList<>();
+
+    private OperationEnum operationEnum;
+
+    public BooleanQuery add(BooleanQuery booleanQuery) {
+        return add(booleanQuery, OperationEnum.AND);
+    }
+
+    public BooleanQuery add(BooleanQuery booleanQuery, OperationEnum operationEnum) {
+        clauses.add(booleanQuery);
+        booleanQuery.setOperationEnum(operationEnum);
+        return this;
+    }
+
+    public BooleanQuery add(Filter filter) {
+        filter.setOperation(OperationEnum.AND.getName());
+        filters.add(filter);
+        return this;
+    }
+
+    public BooleanQuery or(Filter filter) {
+        filter.setOperation(OperationEnum.OR.getName());
+        filters.add(filter);
+        return this;
+    }
+
+    public List<Filter> getFilters() {
+        return filters;
+    }
+
+    public OperationEnum getOperationEnum() {
+        return operationEnum;
+    }
+
+    public void setOperationEnum(OperationEnum operationEnum) {
+        this.operationEnum = operationEnum;
+    }
+}

+ 10 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -21,6 +21,8 @@ public class Query {
 
     private List<Param> params;
 
+    private BooleanQuery booleanQuery;
+
     private int pageNum = 1;
 
     private int pageSize = 20;
@@ -81,6 +83,14 @@ public class Query {
         this.params = params;
     }
 
+    public BooleanQuery getBooleanQuery() {
+        return booleanQuery;
+    }
+
+    public void setBooleanQuery(BooleanQuery booleanQuery) {
+        this.booleanQuery = booleanQuery;
+    }
+
     public int getPageNum() {
         return pageNum;
     }

+ 18 - 0
dbsyncer-web/src/main/resources/public/plugin/plugin.html

@@ -75,6 +75,24 @@ public class MyPlugin implements ConvertService {
     @Override
     public void postProcessAfter(ConvertContext context) {
         // 完成同步后调用该方法
+       logger.info("插件正在处理同步成功的数据,目标源表:{},事件:{},条数:{}", context.getTargetTableName(), context.getEvent(), context.getTargetList().size());
+
+        ConnectorMapper connectorMapper = context.getSourceConnectorMapper();
+
+        // 获取关系型数据库连接, 实现自己的业务逻辑...
+        if (connectorMapper instanceof DatabaseConnectorMapper) {
+            DatabaseConnectorMapper db = (DatabaseConnectorMapper) connectorMapper;
+            // 方式一(推荐):
+            String query = "select * from my_user";
+            db.execute(databaseTemplate -> databaseTemplate.queryForList(query));
+
+            // 方式二:
+            try {
+                SimpleConnection connection = (SimpleConnection) db.getConnection();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
     }
 
     /**