AE86 1 year ago
parent
commit
0af3cd0830

+ 1 - 1
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/impl/DefaultNotifyServiceProvider.java

@@ -10,7 +10,7 @@ import org.dbsyncer.plugin.NotifyService;
  * @version 1.0.0
  * @date 2022/11/13 22:57
  */
-public class DefaultNotifyServiceProvider implements NotifyService {
+public final class DefaultNotifyServiceProvider implements NotifyService {
     @Override
     public void sendMessage(NotifyMessage notifyMessage) {
 

+ 48 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/CommandConfig.java

@@ -0,0 +1,48 @@
+package org.dbsyncer.sdk.config;
+
+import org.dbsyncer.sdk.model.ConnectorConfig;
+import org.dbsyncer.sdk.model.Filter;
+import org.dbsyncer.sdk.model.Table;
+
+import java.util.List;
+
+/**
+ * 查询同步参数模板
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/16 23:59
+ */
+public class CommandConfig {
+
+    private String type;
+
+    private Table table;
+
+    private List<Filter> filter;
+
+    private ConnectorConfig connectorConfig;
+
+    public CommandConfig(String type, Table table, ConnectorConfig connectorConfig, List<Filter> filter) {
+        this.type = type;
+        this.table = table;
+        this.filter = filter;
+        this.connectorConfig = connectorConfig;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Table getTable() {
+        return table;
+    }
+
+    public List<Filter> getFilter() {
+        return filter;
+    }
+
+    public ConnectorConfig getConnectorConfig() {
+        return connectorConfig;
+    }
+}

+ 74 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/DDLConfig.java

@@ -0,0 +1,74 @@
+package org.dbsyncer.sdk.config;
+
+import org.dbsyncer.sdk.enums.DDLOperationEnum;
+import org.dbsyncer.sdk.model.Field;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class DDLConfig {
+    /**
+     * 执行命令
+     */
+    private String sql;
+
+    private DDLOperationEnum ddlOperationEnum;
+
+    private List<Field> addFields = new LinkedList<>();
+
+    private List<Field> removeFields = new LinkedList<>();
+
+    //记录源表的源字段名称
+    private String sourceColumnName;
+
+    //记录改变后的字段名称
+    private String changedColumnName;
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public List<Field> getAddFields() {
+        return addFields;
+    }
+
+    public void setAddFields(List<Field> addFields) {
+        this.addFields = addFields;
+    }
+
+    public List<Field> getRemoveFields() {
+        return removeFields;
+    }
+
+    public void setRemoveFields(List<Field> removeFields) {
+        this.removeFields = removeFields;
+    }
+
+    public String getSourceColumnName() {
+        return sourceColumnName;
+    }
+
+    public void setSourceColumnName(String sourceColumnName) {
+        this.sourceColumnName = sourceColumnName;
+    }
+
+    public String getChangedColumnName() {
+        return changedColumnName;
+    }
+
+    public void setChangedColumnName(String changedColumnName) {
+        this.changedColumnName = changedColumnName;
+    }
+
+    public DDLOperationEnum getDdlOperationEnum() {
+        return ddlOperationEnum;
+    }
+
+    public void setDdlOperationEnum(DDLOperationEnum ddlOperationEnum) {
+        this.ddlOperationEnum = ddlOperationEnum;
+    }
+}

+ 55 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/ReaderConfig.java

@@ -0,0 +1,55 @@
+package org.dbsyncer.sdk.config;
+
+import org.dbsyncer.sdk.model.Table;
+
+import java.util.List;
+import java.util.Map;
+
+public class ReaderConfig {
+
+    private Table table;
+    private Map<String, String> command;
+    private List<Object> args;
+    private boolean supportedCursor;
+    private Object[] cursors;
+    private int pageIndex;
+    private int pageSize;
+
+    public ReaderConfig(Table table, Map<String, String> command, List<Object> args, boolean supportedCursor, Object[] cursors, int pageIndex, int pageSize) {
+        this.table = table;
+        this.command = command;
+        this.args = args;
+        this.supportedCursor = supportedCursor;
+        this.cursors = cursors;
+        this.pageIndex = pageIndex;
+        this.pageSize = pageSize;
+    }
+
+    public Table getTable() {
+        return table;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public List<Object> getArgs() {
+        return args;
+    }
+
+    public Object[] getCursors() {
+        return cursors;
+    }
+
+    public int getPageIndex() {
+        return pageIndex;
+    }
+
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    public boolean isSupportedCursor() {
+        return supportedCursor;
+    }
+}

+ 59 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/WriterBatchConfig.java

@@ -0,0 +1,59 @@
+package org.dbsyncer.sdk.config;
+
+import org.dbsyncer.sdk.model.Field;
+
+import java.util.List;
+import java.util.Map;
+
+public class WriterBatchConfig {
+
+    /**
+     * 表名
+     */
+    private String tableName;
+    /**
+     * 事件
+     */
+    private String event;
+    /**
+     * 执行命令
+     */
+    private Map<String, String> command;
+    /**
+     * 字段信息
+     */
+    private List<Field> fields;
+    /**
+     * 集合数据
+     */
+    private List<Map> data;
+
+    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data) {
+        this.tableName = tableName;
+        this.event = event;
+        this.command = command;
+        this.fields = fields;
+        this.data = data;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public List<Map> getData() {
+        return data;
+    }
+
+}

+ 7 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/CompareFilter.java

@@ -0,0 +1,7 @@
+package org.dbsyncer.sdk.connector;
+
+public interface CompareFilter {
+
+    boolean compare(String value, String filterValue);
+
+}

+ 15 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/DDLOperationEnum.java

@@ -0,0 +1,15 @@
+package org.dbsyncer.sdk.enums;
+
+/**
+ * 支持同步的DDL命令
+ *
+ * @version 1.0.0
+ * @Author life
+ * @Date 2023-09-24 14:24
+ */
+public enum DDLOperationEnum {
+    ALTER_MODIFY,
+    ALTER_ADD,
+    ALTER_DROP,
+    ALTER_CHANGE;
+}

+ 114 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/FilterEnum.java

@@ -0,0 +1,114 @@
+package org.dbsyncer.sdk.enums;
+
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.sdk.SdkException;
+import org.dbsyncer.sdk.connector.CompareFilter;
+
+/**
+ * 运算符表达式类型
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/26 23:21
+ */
+public enum FilterEnum {
+
+    /**
+     * 等于
+     */
+    EQUAL("=", (value, filterValue) -> StringUtil.equals(value, filterValue)),
+    /**
+     * 不等于
+     */
+    NOT_EQUAL("!=", (value, filterValue) -> !StringUtil.equals(value, filterValue)),
+    /**
+     * 大于
+     */
+    GT(">", (value, filterValue) -> NumberUtil.toLong(value) > NumberUtil.toLong(filterValue)),
+    /**
+     * 小于
+     */
+    LT("<", (value, filterValue) -> NumberUtil.toLong(value) < NumberUtil.toLong(filterValue)),
+    /**
+     * 大于等于
+     */
+    GT_AND_EQUAL(">=", (value, filterValue) -> NumberUtil.toLong(value) >= NumberUtil.toLong(filterValue)),
+    /**
+     * 小于等于
+     */
+    LT_AND_EQUAL("<=", (value, filterValue) -> NumberUtil.toLong(value) <= NumberUtil.toLong(filterValue)),
+    /**
+     * 模糊匹配
+     */
+    LIKE("like", (value, filterValue) -> {
+        boolean startsWith = StringUtil.startsWith(filterValue, "%") || StringUtil.startsWith(filterValue, "*");
+        boolean endsWith = StringUtil.endsWith(filterValue, "%") || StringUtil.endsWith(filterValue, "*");
+        String compareValue = StringUtil.replace(filterValue, "%", "");
+        compareValue = StringUtil.replace(compareValue, "*", "");
+        // 模糊匹配
+        if(startsWith && endsWith){
+            return StringUtil.contains(value, compareValue);
+        }
+        // 前缀匹配
+        if(endsWith){
+            return StringUtil.startsWith(value, compareValue);
+        }
+        // 后缀匹配
+        if(startsWith){
+            return StringUtil.endsWith(value, compareValue);
+        }
+        return false;
+    });
+
+    // 运算符名称
+    private String name;
+    // 比较器
+    private CompareFilter compareFilter;
+
+    FilterEnum(String name, CompareFilter compareFilter) {
+        this.name = name;
+        this.compareFilter = compareFilter;
+    }
+
+    /**
+     * 获取表达式
+     *
+     * @param name
+     * @return
+     * @throws SdkException
+     */
+    public static FilterEnum getFilterEnum(String name) throws SdkException {
+        for (FilterEnum e : FilterEnum.values()) {
+            if (StringUtil.equals(name, e.getName())) {
+                return e;
+            }
+        }
+        throw new SdkException(String.format("FilterEnum name \"%s\" does not exist.", name));
+    }
+
+    /**
+     * 获取比较器
+     *
+     * @param filterName
+     * @return
+     * @throws SdkException
+     */
+    public static CompareFilter getCompareFilter(String filterName) throws SdkException {
+        for (FilterEnum e : FilterEnum.values()) {
+            if (StringUtil.equals(filterName, e.getName())) {
+                return e.getCompareFilter();
+            }
+        }
+        throw new SdkException(String.format("FilterEnum name \"%s\" does not exist.", filterName));
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public CompareFilter getCompareFilter() {
+        return compareFilter;
+    }
+
+}

+ 40 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/OperationEnum.java

@@ -0,0 +1,40 @@
+package org.dbsyncer.sdk.enums;
+
+/**
+ * 条件表达式类型
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/26 23:21
+ */
+public enum OperationEnum {
+
+    /**
+     * 并且
+     */
+    AND("and"),
+    /**
+     * 或者
+     */
+    OR("or");
+
+    // 描述
+    private String name;
+
+    OperationEnum(String name) {
+        this.name = name;
+    }
+
+    public static boolean isAnd(String name) {
+        return AND.getName().equals(name);
+    }
+
+    public static boolean isOr(String name) {
+        return OR.getName().equals(name);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+}

+ 47 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/TableTypeEnum.java

@@ -0,0 +1,47 @@
+package org.dbsyncer.sdk.enums;
+
+/**
+ * 表类型
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/08/26 21:13
+ */
+public enum TableTypeEnum {
+
+    /**
+     * 表
+     */
+    TABLE("TABLE"),
+
+    /**
+     * 视图
+     */
+    VIEW("VIEW"),
+
+    /**
+     * 物化视图
+     */
+    MATERIALIZED_VIEW("MATERIALIZED VIEW");
+
+    private String code;
+
+    TableTypeEnum(String code) {
+        this.code = code;
+    }
+
+    /**
+     * 是否视图类型
+     *
+     * @param type
+     * @return
+     */
+    public static boolean isView(String type) {
+        return VIEW.getCode().equals(type);
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+}

+ 123 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/Field.java

@@ -0,0 +1,123 @@
+package org.dbsyncer.sdk.model;
+
+import org.dbsyncer.common.util.JsonUtil;
+
+/**
+ * 字段属性
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/30 15:10
+ */
+public class Field {
+
+    /**
+     * 字段名,ID
+     */
+    private String name;
+
+    /**
+     * 类型名,INT
+     */
+    private String typeName;
+
+    /**
+     * 类型编码,4
+     */
+    private int type;
+
+    /**
+     * 主键
+     */
+    private boolean pk;
+
+    /**
+     * 字段别名
+     */
+    private String labelName;
+
+    /**
+     * 是否系统字段
+     */
+    private boolean unmodifiabled;
+
+    public Field() {
+    }
+
+    public Field(String name, String typeName, int type) {
+        this.name = name;
+        this.typeName = typeName;
+        this.type = type;
+    }
+
+    public Field(String name, String typeName, int type, boolean pk) {
+        this.name = name;
+        this.typeName = typeName;
+        this.type = type;
+        this.pk = pk;
+    }
+
+    public Field(String name, String typeName, int type, boolean pk, String labelName, boolean unmodifiabled) {
+        this.name = name;
+        this.typeName = typeName;
+        this.type = type;
+        this.pk = pk;
+        this.labelName = labelName;
+        this.unmodifiabled = unmodifiabled;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getTypeName() {
+        return typeName;
+    }
+
+    public void setTypeName(String typeName) {
+        this.typeName = typeName;
+    }
+
+    public int getType() {
+        return type;
+    }
+
+    public void setType(int type) {
+        this.type = type;
+    }
+
+    public boolean isPk() {
+        return pk;
+    }
+
+    public void setPk(boolean pk) {
+        this.pk = pk;
+    }
+
+    public String getLabelName() {
+        return labelName;
+    }
+
+    public Field setLabelName(String labelName) {
+        this.labelName = labelName;
+        return this;
+    }
+
+    public boolean isUnmodifiabled() {
+        return unmodifiabled;
+    }
+
+    public Field setUnmodifiabled(boolean unmodifiabled) {
+        this.unmodifiabled = unmodifiabled;
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.objToJson(this);
+    }
+}

+ 65 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/Filter.java

@@ -0,0 +1,65 @@
+package org.dbsyncer.sdk.model;
+
+import org.dbsyncer.sdk.enums.FilterEnum;
+import org.dbsyncer.sdk.enums.OperationEnum;
+
+/**
+ * 字段属性条件
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/30 15:10
+ */
+public class Filter {
+
+    /**
+     * 字段名,ID
+     */
+    private String name;
+
+    /**
+     * @see OperationEnum
+     */
+    private String operation;
+
+    /**
+     * @see FilterEnum
+     */
+    private String filter;
+
+    /**
+     * 值
+     */
+    private String value;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getOperation() {
+        return operation;
+    }
+
+    public void setOperation(String operation) {
+        this.operation = operation;
+    }
+
+    public String getFilter() {
+        return filter;
+    }
+
+    public void setFilter(String filter) {
+        this.filter = filter;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+}

+ 62 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/MetaInfo.java

@@ -0,0 +1,62 @@
+package org.dbsyncer.sdk.model;
+
+import java.util.List;
+
+/**
+ * 连接器基本信息
+ *
+ * @author AE86
+ * @ClassName: MetaInfo
+ * @Description: 包括字段信息、总条数
+ * @date: 2017年7月20日 下午3:37:59
+ */
+public class MetaInfo {
+
+    /**
+     * 表类型
+     */
+    private String tableType;
+
+    /**
+     * 属性字段
+     * 格式:[{"name":"ID","typeName":"INT","type":"4"},{"name":"NAME","typeName":"VARCHAR","type":"12"}]
+     */
+    private List<Field> column;
+
+    /**
+     * sql
+     */
+    private String sql;
+
+    public String getTableType() {
+        return tableType;
+    }
+
+    public MetaInfo setTableType(String tableType) {
+        this.tableType = tableType;
+        return this;
+    }
+
+    public List<Field> getColumn() {
+        return column;
+    }
+
+    public MetaInfo setColumn(List<Field> column) {
+        this.column = column;
+        return this;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public MetaInfo setSql(String sql) {
+        this.sql = sql;
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("MetaInfo{").append("tableType=").append(tableType).append(", ").append("column=").append(column).append('}').toString();
+    }
+}

+ 95 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/Table.java

@@ -0,0 +1,95 @@
+package org.dbsyncer.sdk.model;
+
+import org.dbsyncer.sdk.enums.TableTypeEnum;
+
+import java.util.List;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/10/15 23:58
+ */
+public class Table {
+
+    /**
+     * 表名
+     */
+    private String name;
+
+    /**
+     * 表类型[TABLE、VIEW、MATERIALIZED VIEW]
+     */
+    private String type;
+
+    /**
+     * 属性字段
+     * 格式:[{"name":"ID","typeName":"INT","type":"4"},{"name":"NAME","typeName":"VARCHAR","type":"12"}]
+     */
+    private List<Field> column;
+
+    /**
+     * sql
+     */
+    private String sql;
+
+    // 总数
+    private long count;
+
+    public Table() {
+    }
+
+    public Table(String name) {
+        this(name, TableTypeEnum.TABLE.getCode());
+    }
+
+    public Table(String name, String type) {
+        this(name, type, null, null);
+    }
+
+    public Table(String name, String type, List<Field> column, String sql) {
+        this.name = name;
+        this.type = type;
+        this.column = column;
+        this.sql = sql;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public List<Field> getColumn() {
+        return column;
+    }
+
+    public void setColumn(List<Field> column) {
+        this.column = column;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public void setCount(long count) {
+        this.count = count;
+    }
+}

+ 55 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorInstance.java

@@ -0,0 +1,55 @@
+package org.dbsyncer.sdk.spi;
+
+/**
+ * 连接器实例,管理连接生命周期
+ *
+ * @param <K> 配置
+ * @param <V> 实例
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/20 23:00
+ */
+public interface ConnectorInstance<K, V> extends Cloneable {
+
+    /**
+     * 获取服务地址
+     *
+     * @return
+     */
+    String getServiceUrl();
+
+    /**
+     * 获取连接配置
+     *
+     * @return
+     */
+    K getConfig();
+
+    /**
+     * 设置
+     *
+     * @param k
+     */
+    void setConfig(K k);
+
+    /**
+     * 获取连接通道实例
+     *
+     * @return
+     * @throws Exception
+     */
+    V getConnection() throws Exception;
+
+    /**
+     * 关闭连接器
+     */
+    void close();
+
+    /**
+     * 浅拷贝连接器
+     *
+     * @return
+     * @throws CloneNotSupportedException
+     */
+    Object clone() throws CloneNotSupportedException;
+}

+ 127 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorService.java

@@ -0,0 +1,127 @@
+package org.dbsyncer.sdk.spi;
+
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.sdk.SdkException;
+import org.dbsyncer.sdk.config.CommandConfig;
+import org.dbsyncer.sdk.config.DDLConfig;
+import org.dbsyncer.sdk.config.ReaderConfig;
+import org.dbsyncer.sdk.config.WriterBatchConfig;
+import org.dbsyncer.sdk.model.MetaInfo;
+import org.dbsyncer.sdk.model.Table;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 连接器基础功能
+ *
+ * @param <I> ConnectorInstance
+ * @param <C> ConnectorConfig
+ * @Version 1.0.0
+ * @Author AE86
+ * @Date 2023-11-19 23:24
+ */
+public interface ConnectorService<I, C> {
+
+    /**
+     * 建立连接
+     *
+     * @param config
+     * @return
+     */
+    ConnectorInstance connect(C config);
+
+    /**
+     * 断开连接
+     *
+     * @param connectorInstance
+     */
+    void disconnect(I connectorInstance);
+
+    /**
+     * 检查连接器是否连接正常
+     *
+     * @param connectorInstance
+     * @return
+     */
+    boolean isAlive(I connectorInstance);
+
+    /**
+     * 获取连接缓存key
+     *
+     * @param config
+     * @return
+     */
+    String getConnectorInstanceCacheKey(C config);
+
+    /**
+     * 获取所有表名
+     *
+     * @param connectorInstance
+     * @return
+     */
+    List<Table> getTable(I connectorInstance);
+
+    /**
+     * 获取表元信息
+     *
+     * @param connectorInstance
+     * @param tableNamePattern
+     * @return
+     */
+    MetaInfo getMetaInfo(I connectorInstance, String tableNamePattern);
+
+    /**
+     * 获取总数
+     *
+     * @param connectorInstance
+     * @param command
+     * @return
+     */
+    long getCount(I connectorInstance, Map<String, String> command);
+
+    /**
+     * 分页获取数据源数据
+     *
+     * @param connectorInstance
+     * @param config
+     * @return
+     */
+    Result reader(I connectorInstance, ReaderConfig config);
+
+    /**
+     * 批量写入目标源数据
+     *
+     * @param connectorInstance
+     * @param config
+     * @return
+     */
+    Result writer(I connectorInstance, WriterBatchConfig config);
+
+    /**
+     * 执行DDL命令
+     *
+     * @param connectorInstance
+     * @param ddlConfig
+     * @return
+     */
+    default Result writerDDL(I connectorInstance, DDLConfig ddlConfig) {
+        throw new SdkException("Unsupported method.");
+    }
+
+    /**
+     * 获取数据源同步参数
+     *
+     * @param commandConfig
+     * @return
+     */
+    Map<String, String> getSourceCommand(CommandConfig commandConfig);
+
+    /**
+     * 获取目标源同步参数
+     *
+     * @param commandConfig
+     * @return
+     */
+    Map<String, String> getTargetCommand(CommandConfig commandConfig);
+}