Sfoglia il codice sorgente

支持ES深度分页 https://gitee.com/ghi/dbsyncer/issues/I7BILF

AE86 1 anno fa
parent
commit
01cc570ca2

+ 26 - 17
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -6,14 +6,37 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.model.Field;
-import org.dbsyncer.connector.schema.*;
+import org.dbsyncer.connector.schema.BigintValueMapper;
+import org.dbsyncer.connector.schema.BinaryValueMapper;
+import org.dbsyncer.connector.schema.BitValueMapper;
+import org.dbsyncer.connector.schema.BlobValueMapper;
+import org.dbsyncer.connector.schema.CharValueMapper;
+import org.dbsyncer.connector.schema.ClobValueMapper;
+import org.dbsyncer.connector.schema.DateValueMapper;
+import org.dbsyncer.connector.schema.DecimalValueMapper;
+import org.dbsyncer.connector.schema.DoubleValueMapper;
+import org.dbsyncer.connector.schema.FloatValueMapper;
+import org.dbsyncer.connector.schema.IntegerValueMapper;
+import org.dbsyncer.connector.schema.LongVarBinaryValueMapper;
+import org.dbsyncer.connector.schema.LongVarcharValueMapper;
+import org.dbsyncer.connector.schema.NCharValueMapper;
+import org.dbsyncer.connector.schema.NClobValueMapper;
+import org.dbsyncer.connector.schema.NVarcharValueMapper;
+import org.dbsyncer.connector.schema.NumberValueMapper;
+import org.dbsyncer.connector.schema.OtherValueMapper;
+import org.dbsyncer.connector.schema.RealValueMapper;
+import org.dbsyncer.connector.schema.RowIdValueMapper;
+import org.dbsyncer.connector.schema.SmallintValueMapper;
+import org.dbsyncer.connector.schema.TimeValueMapper;
+import org.dbsyncer.connector.schema.TimestampValueMapper;
+import org.dbsyncer.connector.schema.TinyintValueMapper;
+import org.dbsyncer.connector.schema.VarBinaryValueMapper;
+import org.dbsyncer.connector.schema.VarcharValueMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
-import java.util.ArrayList;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 
 public abstract class AbstractConnector {
@@ -89,20 +112,6 @@ public abstract class AbstractConnector {
         }
     }
 
-    protected List<Field> getPrimaryKeys(List<Field> fields) {
-        List<Field> list = new ArrayList<>();
-
-        for (Field f : fields) {
-            if (f.isPk()) {
-                list.add(f);
-            }
-        }
-        if (CollectionUtils.isEmpty(list)) {
-            throw new ConnectorException("主键为空");
-        }
-        return list;
-    }
-
     protected boolean isUpdate(String event) {
         return StringUtil.equals(ConnectorConstant.OPERTION_UPDATE, event);
     }

+ 1 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ESConfig.java

@@ -33,6 +33,7 @@ public class ESConfig extends AbstractConnectorConfig {
     /**
      * 类型(相当于表), 6.x 每个索引对应一个type;7.x版本不再引入type概念
      */
+    @Deprecated
     private String type = "_doc";
 
     public String getUrl() {

+ 9 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -1,10 +1,13 @@
 package org.dbsyncer.connector.config;
 
+import org.dbsyncer.connector.model.Table;
+
 import java.util.List;
 import java.util.Map;
 
 public class ReaderConfig {
 
+    private Table table;
     private Map<String, String> command;
     private List<Object> args;
     private boolean supportedCursor;
@@ -12,7 +15,8 @@ public class ReaderConfig {
     private int pageIndex;
     private int pageSize;
 
-    public ReaderConfig(Map<String,String> command, List<Object> args, boolean supportedCursor, Object[] cursors, int pageIndex, int pageSize) {
+    public ReaderConfig(Table table, Map<String,String> command, List<Object> args, boolean supportedCursor, Object[] cursors, int pageIndex, int pageSize) {
+        this.table = table;
         this.command = command;
         this.args = args;
         this.supportedCursor = supportedCursor;
@@ -21,6 +25,10 @@ public class ReaderConfig {
         this.pageSize = pageSize;
     }
 
+    public Table getTable() {
+        return table;
+    }
+
     public Map<String, String> getCommand() {
         return command;
     }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -157,7 +157,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             throw new ConnectorException("writer data can not be empty.");
         }
         List<Field> fields = new ArrayList<>(config.getFields());
-        List<Field> pkFields = getPrimaryKeys(config.getFields());
+        List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeys(config);
         // Update / Delete
         if (!isInsert(event)) {
             if (isDelete(event)) {

+ 18 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -21,6 +21,7 @@ import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.ESUtil;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -41,6 +42,7 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -161,6 +163,15 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         builder.from((config.getPageIndex() - 1) * config.getPageSize());
         builder.size(config.getPageSize());
         builder.timeout(TimeValue.timeValueMillis(10));
+        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(config.getTable());
+        if (!CollectionUtils.isEmpty(primaryKeys)) {
+            primaryKeys.forEach(pk -> builder.sort(pk, SortOrder.ASC));
+            // 深度分页
+            if (!CollectionUtils.isEmpty(config.getCursors())) {
+                builder.from(0);
+                builder.searchAfter(config.getCursors());
+            }
+        }
 
         try {
             SearchRequest rq = new SearchRequest(new String[] {cfg.getIndex()}, builder);
@@ -188,12 +199,12 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
         final Result result = new Result();
         final ESConfig cfg = connectorMapper.getConfig();
-        final List<Field> pkFields = getPrimaryKeys(config.getFields());
+        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeys(config);
         try {
             BulkRequest request = new BulkRequest();
             // 默认取第一个主键
             final String pk = pkFields.get(0).getName();
-            data.forEach(row -> addRequest(request, cfg.getIndex(), cfg.getType(), config.getEvent(), String.valueOf(row.get(pk)), row));
+            data.forEach(row -> addRequest(request, cfg.getIndex(), config.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
@@ -231,10 +242,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        Table table = commandConfig.getTable();
-        if (!CollectionUtils.isEmpty(table.getColumn())) {
-            getPrimaryKeys(table.getColumn());
-        }
+        PrimaryKeyUtil.findTablePrimaryKeys(commandConfig.getTable());
         return Collections.EMPTY_MAP;
     }
 
@@ -293,21 +301,21 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         }
     }
 
-    private void addRequest(BulkRequest request, String index, String type, String event, String id, Map data) {
+    private void addRequest(BulkRequest request, String index, String event, String id, Map data) {
         if (isUpdate(event)) {
-            UpdateRequest req = new UpdateRequest(index, type, id);
+            UpdateRequest req = new UpdateRequest(index, id);
             req.doc(data, XContentType.JSON);
             request.add(req);
             return;
         }
         if (isInsert(event)) {
-            IndexRequest req = new IndexRequest(index, type, id);
+            IndexRequest req = new IndexRequest(index);
             req.source(data, XContentType.JSON);
             request.add(req);
             return;
         }
         if (isDelete(event)) {
-            request.add(new DeleteRequest(index, type, id));
+            request.add(new DeleteRequest(index, id));
         }
     }
 

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -14,6 +14,7 @@ import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,11 +85,11 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
 
         Result result = new Result();
         final KafkaConfig cfg = connectorMapper.getConfig();
-        final List<Field> pkNames = getPrimaryKeys(config.getFields());
+        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeys(config);
         try {
             String topic = cfg.getTopic();
             // 默认取第一个主键
-            final String pk = pkNames.get(0).getName();
+            final String pk = pkFields.get(0).getName();
             data.forEach(row -> connectorMapper.getConnection().send(topic, String.valueOf(row.get(pk)), row));
             result.addSuccessData(data);
         } catch (Exception e) {

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -75,7 +75,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         String queryFilterSql = getQueryFilterSql(commandConfig.getFilter());
         Table table = commandConfig.getTable();
         Map<String, String> map = new HashMap<>();
-        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(commandConfig.getTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);
         if (CollectionUtils.isEmpty(primaryKeys)) {
             return map;
         }

+ 24 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
 
@@ -46,6 +47,29 @@ public abstract class PrimaryKeyUtil {
         return Collections.unmodifiableList(primaryKeys);
     }
 
+    /**
+     * 返回主键集合
+     *
+     * @param config
+     * @return
+     */
+    public static List<Field> findConfigPrimaryKeys(WriterBatchConfig config) {
+        if (null == config) {
+            throw new ConnectorException("The config is null.");
+        }
+
+        List<Field> list = new ArrayList<>();
+        for (Field f : config.getFields()) {
+            if (f.isPk()) {
+                list.add(f);
+            }
+        }
+        if (CollectionUtils.isEmpty(list)) {
+            throw new ConnectorException("主键为空");
+        }
+        return list;
+    }
+
     /**
      * 返回主键字段类型
      *

+ 8 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -9,6 +9,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.slf4j.Logger;
@@ -34,7 +35,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private static final String CURSOR = "cursor";
     private static final int READ_NUM = 1000;
-    private List<TableGroupCommand> commands;
+    private List<TableGroupQuartzCommand> commands;
     private String eventFieldName;
     private boolean forceUpdate;
     private Set<String> update;
@@ -95,9 +96,10 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         running = false;
     }
 
-    private void execute(TableGroupCommand tableGroupCommand, int index) {
-        final Map<String, String> command = tableGroupCommand.getCommand();
-        final List<String> primaryKeys = tableGroupCommand.getPrimaryKeys();
+    private void execute(TableGroupQuartzCommand tableGroupQuartzCommand, int index) {
+        final Map<String, String> command = tableGroupQuartzCommand.getCommand();
+        final List<String> primaryKeys = tableGroupQuartzCommand.getPrimaryKeys();
+        final Table table = tableGroupQuartzCommand.getTable();
         boolean supportedCursor = StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR));
 
         // 检查增量点
@@ -107,7 +109,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         Object[] cursors = PrimaryKeyUtil.getLastCursors(snapshot.get(index + CURSOR));
 
         while (running) {
-            ReaderConfig readerConfig = new ReaderConfig(point.getCommand(), point.getArgs(), supportedCursor, cursors, pageIndex++, READ_NUM);
+            ReaderConfig readerConfig = new ReaderConfig(table, point.getCommand(), point.getArgs(), supportedCursor, cursors, pageIndex++, READ_NUM);
             Result reader = connectorFactory.reader(connectionMapper, readerConfig);
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
@@ -157,7 +159,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
     }
 
-    public void setCommands(List<TableGroupCommand> commands) {
+    public void setCommands(List<TableGroupQuartzCommand> commands) {
         this.commands = commands;
     }
 

+ 0 - 24
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java

@@ -1,24 +0,0 @@
-package org.dbsyncer.listener.quartz;
-
-import java.util.List;
-import java.util.Map;
-
-public class TableGroupCommand {
-
-    private List<String> primaryKeys;
-
-    private Map<String, String> command;
-
-    public TableGroupCommand(List<String> primaryKeys, Map<String, String> command) {
-        this.primaryKeys = primaryKeys;
-        this.command = command;
-    }
-
-    public List<String> getPrimaryKeys() {
-        return primaryKeys;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
-}

+ 34 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupQuartzCommand.java

@@ -0,0 +1,34 @@
+package org.dbsyncer.listener.quartz;
+
+import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+
+import java.util.List;
+import java.util.Map;
+
+public class TableGroupQuartzCommand {
+
+    private Table table;
+
+    private List<String> primaryKeys;
+
+    private Map<String, String> command;
+
+    public TableGroupQuartzCommand(Table table, Map<String, String> command) {
+        this.table = table;
+        this.command = command;
+        this.primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);;
+    }
+
+    public Table getTable() {
+        return table;
+    }
+
+    public List<String> getPrimaryKeys() {
+        return primaryKeys;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+}

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -64,7 +64,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
                 Task task = map.get(metaId);
                 doTask(task, mapping, list, executor);
             } catch (Exception e) {
-                logger.error(e.getMessage());
+                logger.error(e.getMessage(), e);
                 logService.log(LogType.SystemLog.ERROR, e.getMessage());
             } finally {
                 try {

+ 2 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -7,16 +7,14 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
-import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.listener.quartz.AbstractQuartzExtractor;
-import org.dbsyncer.listener.quartz.TableGroupCommand;
+import org.dbsyncer.listener.quartz.TableGroupQuartzCommand;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.model.FieldPicker;
@@ -145,10 +143,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         // 默认定时抽取
         if (ListenerTypeEnum.isTiming(listenerType)) {
             AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
-            quartzExtractor.setCommands(list.stream().map(t -> {
-                List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(t.getSourceTable());
-                return new TableGroupCommand(primaryKeys, t.getCommand());
-            }).collect(Collectors.toList()));
+            quartzExtractor.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
             quartzExtractor.register(new QuartzListener(mapping, list));
             extractor = quartzExtractor;
         }

+ 4 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -245,12 +245,13 @@ public class ParserFactory implements Parser {
         Map<String, String> command = group.getCommand();
         Assert.notEmpty(command, "执行命令不能为空.");
         List<FieldMapping> fieldMapping = group.getFieldMapping();
-        String sTableName = group.getSourceTable().getName();
+        Table sourceTable = group.getSourceTable();
+        String sTableName = sourceTable.getName();
         String tTableName = group.getTargetTable().getName();
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
-        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(tableGroup.getSourceTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(sourceTable);
         boolean supportedCursor = StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR));
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
@@ -266,7 +267,7 @@ public class ParserFactory implements Parser {
             }
 
             // 1、获取数据源数据
-            ReaderConfig readerConfig = new ReaderConfig(command, new ArrayList<>(), supportedCursor, task.getCursors(), task.getPageIndex(), pageSize);
+            ReaderConfig readerConfig = new ReaderConfig(sourceTable, command, new ArrayList<>(), supportedCursor, task.getCursors(), task.getPageIndex(), pageSize);
             Result reader = connectorFactory.reader(sConnectorMapper, readerConfig);
             List<Map> source = reader.getSuccessData();
             if (CollectionUtils.isEmpty(source)) {