Przeglądaj źródła

!228 merge
Merge pull request !228 from AE86/v_2.0

AE86 1 rok temu
rodzic
commit
0b7e7b6e01

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -159,7 +159,7 @@ public class TableGroupChecker extends AbstractChecker {
                 }
             });
         }
-        return new Table(tableName, metaInfo.getTableType(), metaInfo.getColumn(), metaInfo.getSql());
+        return new Table(tableName, metaInfo.getTableType(), metaInfo.getColumn(), metaInfo.getSql(), metaInfo.getIndexType());
     }
 
     private void checkRepeatedTable(String mappingId, String sourceTable, String targetTable) {

+ 52 - 31
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -33,6 +33,7 @@ import org.dbsyncer.sdk.spi.ConnectorService;
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -40,10 +41,13 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.GetAliasesResponse;
+import org.elasticsearch.client.IndicesClient;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
+import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentHelper;
@@ -63,9 +67,11 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -81,6 +87,12 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
 
     private final String TYPE = "Elasticsearch";
 
+    public static final String SOURCE_INDEX_NAME = "_source_index";
+
+    public static final String TARGET_INDEX_NAME = "_target_index";
+
+    public static final String INDEX_TYPE = "_doc";
+
     private final Map<String, FilterMapper> filters = new LinkedHashMap<>();
 
     private final ESConfigValidator configValidator = new ESConfigValidator();
@@ -152,22 +164,15 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
     @Override
     public List<Table> getTable(ESConnectorInstance connectorInstance) {
         try {
-            // TODO 获取所有索引和type
-            ESConfig config = connectorInstance.getConfig();
-            GetIndexRequest request = new GetIndexRequest(config.getIndex());
-            GetIndexResponse indexResponse = connectorInstance.getConnection().indices().get(request, RequestOptions.DEFAULT);
-            MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
-            List<Table> tables = new ArrayList<>();
-            // 6.x 版本
-            if (Version.V_7_0_0.after(connectorInstance.getVersion())) {
-                Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
-                sourceMap.keySet().forEach(tableName -> tables.add(new Table(tableName)));
-                return tables;
+            IndicesClient indices = connectorInstance.getConnection().indices();
+            GetAliasesRequest aliasesRequest = new GetAliasesRequest();
+            GetAliasesResponse indicesAlias = indices.getAlias(aliasesRequest, RequestOptions.DEFAULT);
+            Map<String, Set<AliasMetadata>> aliases = indicesAlias.getAliases();
+            if (!CollectionUtils.isEmpty(aliases)) {
+                // 排除系统索引
+                return aliases.keySet().stream().filter(index -> !StringUtil.startsWith(index, StringUtil.POINT)).map(index -> new Table(index)).collect(Collectors.toList());
             }
-
-            // 7.x 版本以上
-            tables.add(new Table(mappingMetaData.type()));
-            return tables;
+            return Collections.EMPTY_LIST;
         } catch (IOException e) {
             logger.error(e.getMessage());
             throw new ElasticsearchException(e);
@@ -175,18 +180,28 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
     }
 
     @Override
-    public MetaInfo getMetaInfo(ESConnectorInstance connectorInstance, String tableName) {
+    public MetaInfo getMetaInfo(ESConnectorInstance connectorInstance, String index) {
         List<Field> fields = new ArrayList<>();
         try {
-            ESConfig config = connectorInstance.getConfig();
-            GetIndexRequest request = new GetIndexRequest(config.getIndex());
+            GetIndexRequest request = new GetIndexRequest(index);
             GetIndexResponse indexResponse = connectorInstance.getConnection().indices().get(request, RequestOptions.DEFAULT);
-            MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
+            MappingMetadata mappingMetaData = indexResponse.getMappings().get(index);
             // 6.x 版本
             if (Version.V_7_0_0.after(connectorInstance.getVersion())) {
                 Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
-                parseProperties(fields, (Map) sourceMap.get(tableName));
-                return new MetaInfo().setColumn(fields);
+                if (CollectionUtils.isEmpty(sourceMap)) {
+                    throw new ElasticsearchException("未获取到索引配置");
+                }
+                Iterator<String> iterator = sourceMap.keySet().iterator();
+                String indexType = null;
+                if (iterator.hasNext()) {
+                    indexType = iterator.next();
+                    parseProperties(fields, (Map) sourceMap.get(indexType));
+                }
+                if (StringUtil.isBlank(indexType)) {
+                    throw new ElasticsearchException("索引type为空");
+                }
+                return new MetaInfo().setColumn(fields).setIndexType(indexType);
             }
 
             // 7.x 版本以上
@@ -201,7 +216,6 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
     @Override
     public long getCount(ESConnectorInstance connectorInstance, Map<String, String> command) {
         try {
-            ESConfig config = connectorInstance.getConfig();
             SearchSourceBuilder builder = new SearchSourceBuilder();
             genSearchSourceBuilder(builder, command);
             // 7.x 版本以上
@@ -210,7 +224,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
             }
             builder.from(0);
             builder.size(0);
-            SearchRequest request = new SearchRequest(new String[]{config.getIndex()}, builder);
+            SearchRequest request = new SearchRequest(new String[]{command.get(SOURCE_INDEX_NAME)}, builder);
             SearchResponse response = connectorInstance.getConnection().searchWithVersion(request, RequestOptions.DEFAULT);
             return response.getHits().getTotalHits().value;
         } catch (IOException e) {
@@ -221,7 +235,6 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
 
     @Override
     public Result reader(ESConnectorInstance connectorInstance, ReaderConfig config) {
-        ESConfig cfg = connectorInstance.getConfig();
         SearchSourceBuilder builder = new SearchSourceBuilder();
         genSearchSourceBuilder(builder, config.getCommand());
         builder.from((config.getPageIndex() - 1) * config.getPageSize());
@@ -238,7 +251,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         }
 
         try {
-            SearchRequest rq = new SearchRequest(new String[]{cfg.getIndex()}, builder);
+            SearchRequest rq = new SearchRequest(new String[]{config.getCommand().get(SOURCE_INDEX_NAME)}, builder);
             SearchResponse searchResponse = connectorInstance.getConnection().searchWithVersion(rq, RequestOptions.DEFAULT);
             SearchHits hits = searchResponse.getHits();
             SearchHit[] searchHits = hits.getHits();
@@ -262,13 +275,13 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         }
 
         final Result result = new Result();
-        final ESConfig cfg = connectorInstance.getConfig();
         final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         try {
-            BulkRequest request = new BulkRequest();
-            // 默认取第一个主键
+            final BulkRequest request = new BulkRequest();
             final String pk = pkFields.get(0).getName();
-            data.forEach(row -> addRequest(request, cfg.getIndex(), config.getTableName(), config.getEvent(), String.valueOf(row.get(pk)), row));
+            final String indexName = config.getCommand().get(TARGET_INDEX_NAME);
+            final String indexType = config.getCommand().get(INDEX_TYPE);
+            data.forEach(row -> addRequest(request, indexName, indexType, config.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
@@ -290,6 +303,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         Map<String, String> command = new HashMap<>();
         // 查询字段
         Table table = commandConfig.getTable();
+        command.put(SOURCE_INDEX_NAME, table.getName());
         List<Field> column = table.getColumn();
         if (!CollectionUtils.isEmpty(column)) {
             List<String> fieldNames = column.stream().map(c -> c.getName()).collect(Collectors.toList());
@@ -306,8 +320,12 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
 
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        PrimaryKeyUtil.findTablePrimaryKeys(commandConfig.getTable());
-        return Collections.EMPTY_MAP;
+        Table table = commandConfig.getTable();
+        PrimaryKeyUtil.findTablePrimaryKeys(table);
+        Map<String, String> command = new HashMap<>();
+        command.put(TARGET_INDEX_NAME, table.getName());
+        command.put(INDEX_TYPE, table.getIndexType());
+        return command;
     }
 
     @Override
@@ -319,6 +337,9 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
     }
 
     private void parseProperties(List<Field> fields, Map<String, Object> sourceMap) {
+        if (CollectionUtils.isEmpty(sourceMap)) {
+            throw new ElasticsearchException("未获取到索引字段.");
+        }
         Map<String, Object> properties = (Map<String, Object>) sourceMap.get(ESUtil.PROPERTIES);
         if (CollectionUtils.isEmpty(properties)) {
             throw new ElasticsearchException("查询字段不能为空.");

+ 0 - 13
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/config/ESConfig.java

@@ -29,11 +29,6 @@ public class ESConfig extends ConnectorConfig {
      */
     private String password;
 
-    /**
-     * 索引(相当于数据库)
-     */
-    private String index;
-
     public String getUrl() {
         return url;
     }
@@ -58,12 +53,4 @@ public class ESConfig extends ConnectorConfig {
         this.password = password;
     }
 
-    public String getIndex() {
-        return index;
-    }
-
-    public void setIndex(String index) {
-        this.index = index;
-    }
-
 }

+ 0 - 3
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/validator/ESConfigValidator.java

@@ -22,16 +22,13 @@ public final class ESConfigValidator implements ConfigValidator<ESConfig> {
     public void modify(ESConfig connectorConfig, Map<String, String> params) {
         String username = params.get("username");
         String password = params.get("password");
-        String index = params.get("index");
         String url = params.get("url");
         Assert.hasText(username, "Username is empty.");
         Assert.hasText(password, "Password is empty.");
-        Assert.hasText(index, "Index is empty.");
         Assert.hasText(url, "Url is empty.");
 
         connectorConfig.setUsername(username);
         connectorConfig.setPassword(password);
-        connectorConfig.setIndex(index);
         connectorConfig.setUrl(url);
     }
 }

+ 1 - 11
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/resources/public/connector/addElasticsearch.html

@@ -18,18 +18,8 @@
         <div class="col-sm-4">
             <input class="form-control" name="url" type="text" maxlength="1024" dbsyncer-valid="require" th:value="${connector?.config?.url}?:'http://127.0.0.1:9200'"/>
         </div>
-        <label class="col-sm-2 control-label">index <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="index" type="text" maxlength="32" dbsyncer-valid="require" placeholder="test" th:value="${connector?.config?.index}?:'test'"/>
-        </div>
+        <div class="col-sm-6"></div>
     </div>
-
-    <script type="text/javascript">
-        $(function () {
-            // 初始化select插件
-            initSelectIndex($(".select-control"), 1);
-        })
-    </script>
 </div>
 
 </html>

+ 0 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/storage/MySQLStorageService.java

@@ -467,7 +467,6 @@ public class MySQLStorageService extends AbstractStorageService {
                     new Field(ConfigConstant.DATA_TARGET_TABLE_NAME, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR),
-                    new Field(ConfigConstant.BINLOG_STATUS, "INTEGER", Types.INTEGER),
                     new Field(ConfigConstant.BINLOG_DATA, "VARBINARY", Types.BLOB)
             ).map(field -> {
                 field.setLabelName(field.getName());

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -95,8 +95,8 @@ public class ParserComponentImpl implements ParserComponent {
         ConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
         Table sourceTable = tableGroup.getSourceTable();
         Table targetTable = tableGroup.getTargetTable();
-        Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), new ArrayList<>(), sourceTable.getSql());
-        Table tTable = new Table(targetTable.getName(), targetTable.getType(), new ArrayList<>(), sourceTable.getSql());
+        Table sTable = sourceTable.clone().setColumn(new ArrayList<>());
+        Table tTable = targetTable.clone().setColumn(new ArrayList<>());
         List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
         if (!CollectionUtils.isEmpty(fieldMapping)) {
             fieldMapping.forEach(m -> {
@@ -217,7 +217,7 @@ public class ParserComponentImpl implements ParserComponent {
         int total = dataList.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList,batchWriter.isEnableOnlyUpdate()));
+            return connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList, batchWriter.isEnableOnlyUpdate()));
         }
 
         // 批量任务, 拆分

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDQLConnector.java

@@ -35,7 +35,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         List<Table> tables = new ArrayList<>();
         if (!CollectionUtils.isEmpty(sqlTables)) {
             sqlTables.forEach(s ->
-                tables.add(new Table(s.getSqlName(), TableTypeEnum.TABLE.getCode(), Collections.EMPTY_LIST, s.getSql()))
+                tables.add(new Table(s.getSqlName(), TableTypeEnum.TABLE.getCode(), Collections.EMPTY_LIST, s.getSql(), null))
             );
         }
         return tables;

+ 0 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/constant/ConfigConstant.java

@@ -43,7 +43,6 @@ public class ConfigConstant {
     /**
      * Binlog
      */
-    public static final String BINLOG_STATUS = "status";
     public static final String BINLOG_DATA = "data";
 
 }

+ 14 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/MetaInfo.java

@@ -28,6 +28,11 @@ public class MetaInfo {
      */
     private String sql;
 
+    /**
+     * 索引类型(ES)
+     */
+    private String indexType;
+
     public String getTableType() {
         return tableType;
     }
@@ -55,6 +60,15 @@ public class MetaInfo {
         return this;
     }
 
+    public String getIndexType() {
+        return indexType;
+    }
+
+    public MetaInfo setIndexType(String indexType) {
+        this.indexType = indexType;
+        return this;
+    }
+
     @Override
     public String toString() {
         return new StringBuilder().append("MetaInfo{").append("tableType=").append(tableType).append(", ").append("column=").append(column).append('}').toString();

+ 23 - 3
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/Table.java

@@ -36,6 +36,11 @@ public class Table {
     // 总数
     private long count;
 
+    /**
+     * 索引类型(ES)
+     */
+    private String indexType;
+
     public Table() {
     }
 
@@ -44,14 +49,15 @@ public class Table {
     }
 
     public Table(String name, String type) {
-        this(name, type, null, null);
+        this(name, type, null, null, null);
     }
 
-    public Table(String name, String type, List<Field> column, String sql) {
+    public Table(String name, String type, List<Field> column, String sql, String indexType) {
         this.name = name;
         this.type = type;
         this.column = column;
         this.sql = sql;
+        this.indexType = indexType;
     }
 
     public String getName() {
@@ -74,8 +80,9 @@ public class Table {
         return column;
     }
 
-    public void setColumn(List<Field> column) {
+    public Table setColumn(List<Field> column) {
         this.column = column;
+        return this;
     }
 
     public String getSql() {
@@ -93,4 +100,17 @@ public class Table {
     public void setCount(long count) {
         this.count = count;
     }
+
+    public String getIndexType() {
+        return indexType;
+    }
+
+    public void setIndexType(String indexType) {
+        this.indexType = indexType;
+    }
+
+    @Override
+    public Table clone() {
+        return new Table(name, type, column, sql, indexType);
+    }
 }