Browse Source

!104 merge
Merge pull request !104 from AE86/V_1.0.0_RC

AE86 2 years ago
parent
commit
a9983c70af

+ 0 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/AbstractDataBaseConfigChecker.java

@@ -23,7 +23,6 @@ public abstract class AbstractDataBaseConfigChecker implements ConnectorConfigCh
         Assert.hasText(username, "Username is empty.");
         Assert.hasText(password, "Password is empty.");
         Assert.hasText(url, "Url is empty.");
-        Assert.hasText(driverClassName, "DriverClassName is empty.");
 
         connectorConfig.setUsername(username);
         connectorConfig.setPassword(password);

+ 0 - 6
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ElasticsearchConfigChecker.java

@@ -20,23 +20,17 @@ public class ElasticsearchConfigChecker implements ConnectorConfigChecker<ESConf
         String username = params.get("username");
         String password = params.get("password");
         String index = params.get("index");
-        String type = params.get("type");
         String url = params.get("url");
-        String schema = params.get("schema");
         String primaryKey = params.get("primaryKey");
         Assert.hasText(username, "Username is empty.");
         Assert.hasText(password, "Password is empty.");
         Assert.hasText(index, "Index is empty.");
-        Assert.hasText(type, "Type is empty.");
         Assert.hasText(url, "Url is empty.");
-        Assert.hasText(schema, "Schema is empty.");
 
         connectorConfig.setUsername(username);
         connectorConfig.setPassword(password);
         connectorConfig.setIndex(index);
-        connectorConfig.setType(type);
         connectorConfig.setUrl(url);
-        connectorConfig.setSchema(schema);
         connectorConfig.setPrimaryKey(primaryKey);
     }
 }

+ 3 - 16
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ESConfig.java

@@ -11,15 +11,10 @@ import org.dbsyncer.common.model.AbstractConnectorConfig;
 public class ESConfig extends AbstractConnectorConfig {
 
     /**
-     * 集群地址192.168.1.100:9200,192.168.1.200:9200
+     * 集群地址, http(s)-9200, tcp-9300 http://192.168.1.100:9200,http://192.168.1.200:9200
      */
     private String url;
 
-    /**
-     * 协议9200(http),9300(tcp)
-     */
-    private String schema;
-
     /**
      * 帐号
      */
@@ -36,9 +31,9 @@ public class ESConfig extends AbstractConnectorConfig {
     private String index;
 
     /**
-     * 类型(相当于表)
+     * 类型(相当于表), 6.x 每个索引对应一个type;7.x版本不再引入type概念
      */
-    private String type;
+    private String type = "_doc";
 
     /**
      * 主键
@@ -53,14 +48,6 @@ public class ESConfig extends AbstractConnectorConfig {
         this.url = url;
     }
 
-    public String getSchema() {
-        return schema;
-    }
-
-    public void setSchema(String schema) {
-        this.schema = schema;
-    }
-
     public String getUsername() {
         return username;
     }

+ 4 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java

@@ -25,19 +25,19 @@ public enum FilterEnum {
     /**
      * 大于
      */
-    GT(">", (value, filterValue) -> NumberUtil.toInt(value) > NumberUtil.toInt(filterValue)),
+    GT(">", (value, filterValue) -> NumberUtil.toLong(value) > NumberUtil.toLong(filterValue)),
     /**
      * 小于
      */
-    LT("<", (value, filterValue) -> NumberUtil.toInt(value) < NumberUtil.toInt(filterValue)),
+    LT("<", (value, filterValue) -> NumberUtil.toLong(value) < NumberUtil.toLong(filterValue)),
     /**
      * 大于等于
      */
-    GT_AND_EQUAL(">=", (value, filterValue) -> NumberUtil.toInt(value) >= NumberUtil.toInt(filterValue)),
+    GT_AND_EQUAL(">=", (value, filterValue) -> NumberUtil.toLong(value) >= NumberUtil.toLong(filterValue)),
     /**
      * 小于等于
      */
-    LT_AND_EQUAL("<=", (value, filterValue) -> NumberUtil.toInt(value) <= NumberUtil.toInt(filterValue)),
+    LT_AND_EQUAL("<=", (value, filterValue) -> NumberUtil.toLong(value) <= NumberUtil.toLong(filterValue)),
     /**
      * 模糊匹配
      */

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -87,7 +87,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
     @Override
     public String getConnectorMapperCacheKey(ESConfig config) {
-        return String.format("%s-%s-%s-%s-%s", config.getConnectorType(), config.getUrl(), config.getIndex(), config.getType(), config.getUsername());
+        return String.format("%s-%s-%s-%", config.getConnectorType(), config.getUrl(), config.getIndex(), config.getUsername());
     }
 
     @Override

+ 19 - 25
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/ESUtil.java

@@ -1,18 +1,23 @@
 package org.dbsyncer.connector.util;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.Credentials;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustAllStrategy;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.ESConfig;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 
+import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Objects;
@@ -20,21 +25,27 @@ import java.util.Objects;
 public abstract class ESUtil {
 
     public static final String PROPERTIES = "properties";
-    private static final int ADDRESS_LENGTH = 2;
 
     private ESUtil() {
     }
 
     public static RestHighLevelClient getConnection(ESConfig config) {
         String[] ipAddress = StringUtil.split(config.getUrl(), ",");
-        HttpHost[] hosts = Arrays.stream(ipAddress).map(node -> makeHttpHost(node, config.getSchema())).filter(Objects::nonNull).toArray(
+        HttpHost[] hosts = Arrays.stream(ipAddress).map(node -> HttpHost.create(node)).filter(Objects::nonNull).toArray(
                 HttpHost[]::new);
         RestClientBuilder builder = RestClient.builder(hosts);
         CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-        Credentials credentials = new UsernamePasswordCredentials(config.getUsername(), config.getPassword());
-        credentialsProvider.setCredentials(AuthScope.ANY, credentials);
-        builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
-        return new RestHighLevelClient(builder);
+        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+        try {
+            SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(new TrustAllStrategy()).build();
+            SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);
+            builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLStrategy(sessionStrategy));
+            final RestHighLevelClient client = new RestHighLevelClient(builder);
+            client.ping(RequestOptions.DEFAULT);
+            return client;
+        } catch (Exception e) {
+            throw new ConnectorException(String.format("Failed to connect to ElasticSearch on %s, %s", config.getUrl(), e.getMessage()));
+        }
     }
 
     public static void close(RestHighLevelClient client) {
@@ -47,21 +58,4 @@ public abstract class ESUtil {
         }
     }
 
-    /**
-     * 根据配置创建HttpHost
-     *
-     * @param address
-     * @param scheme
-     * @return
-     */
-    private static HttpHost makeHttpHost(String address, String scheme) {
-        String[] arr = StringUtil.split(address, ":");
-        if (arr.length == ADDRESS_LENGTH) {
-            String ip = arr[0];
-            int port = Integer.parseInt(arr[1]);
-            return new HttpHost(ip, port, scheme);
-        } else {
-            return null;
-        }
-    }
 }

+ 1 - 2
dbsyncer-listener/src/main/test/ESClientTest.java

@@ -48,8 +48,7 @@ public class ESClientTest {
     @Before
     public void init() {
         ESConfig config = new ESConfig();
-        config.setUrl("127.0.0.1:9200");
-        config.setSchema("http");
+        config.setUrl("https://127.0.0.1:9200");
         config.setUsername("ae86");
         config.setPassword("123456");
         client = ESUtil.getConnection(config);

+ 33 - 5
dbsyncer-manager/src/main/java/org/dbsyncer/manager/model/FieldPicker.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.manager.model;
 
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.CompareFilter;
 import org.dbsyncer.connector.enums.FilterEnum;
@@ -11,6 +12,8 @@ import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.springframework.util.Assert;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -60,15 +63,13 @@ public class FieldPicker {
         }
         // where (id > 1 and id < 100) or (id = 100 or id =101)
         // 或 关系(成立任意条件)
-        CompareFilter filter = null;
         Object value = null;
         for (Filter f : or) {
             value = row.get(f.getName());
             if (null == value) {
                 continue;
             }
-            filter = FilterEnum.getCompareFilter(f.getFilter());
-            if (filter.compare(String.valueOf(value), f.getValue())) {
+            if (compareValueWithFilter(f, value)) {
                 return true;
             }
         }
@@ -80,8 +81,7 @@ public class FieldPicker {
             if (null == value) {
                 continue;
             }
-            filter = FilterEnum.getCompareFilter(f.getFilter());
-            if (!filter.compare(String.valueOf(value), f.getValue())) {
+            if (!compareValueWithFilter(f, value)) {
                 return false;
             }
             pass = true;
@@ -90,6 +90,34 @@ public class FieldPicker {
         return pass;
     }
 
+    /**
+     * 比较值是否满足过滤条件
+     *
+     * @param filter        过滤器
+     * @param comparedValue 比较值
+     * @return
+     */
+    private boolean compareValueWithFilter(Filter filter, Object comparedValue) {
+        CompareFilter compareFilter = FilterEnum.getCompareFilter(filter.getFilter());
+        if (null == filter) {
+            return false;
+        }
+
+        // 支持时间比较
+        if (comparedValue instanceof Timestamp) {
+            Timestamp comparedTimestamp = (Timestamp) comparedValue;
+            Timestamp filterTimestamp = DateFormatUtil.stringToTimestamp(filter.getValue());
+            return compareFilter.compare(String.valueOf(comparedTimestamp.getTime()), String.valueOf(filterTimestamp.getTime()));
+        }
+        if (comparedValue instanceof Date) {
+            Date comparedDate = (Date) comparedValue;
+            Date filterDate = DateFormatUtil.stringToDate(filter.getValue());
+            return compareFilter.compare(String.valueOf(comparedDate.getTime()), String.valueOf(filterDate.getTime()));
+        }
+
+        return compareFilter.compare(String.valueOf(comparedValue), filter.getValue());
+    }
+
     private void init(List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
         // column  => [1, 86, 0, 中文, 2020-05-15T12:17:22.000+0800, 备注信息]
         Assert.notEmpty(column, "读取字段不能为空.");

+ 17 - 11
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -273,20 +273,26 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private void initUpgradeSql() {
         // show tables where Tables_in_dbsyncer like "dbsyncer_data%"
         String sql = String.format(SHOW_DATA_TABLE, database, PREFIX_TABLE.concat(StorageEnum.DATA.getType()).concat("%"));
-        Map<String, String> tables = null;
+        List<String> tables = null;
         try {
-            tables = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForMap(sql));
+            tables = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
         } catch (EmptyResultDataAccessException e) {
             // 没有可更新的表
         }
         if (CollectionUtils.isEmpty(tables)) {
             return;
         }
-        tables.values().forEach(table -> {
+        final String queryColumnCount = "SELECT count(*) FROM information_schema.columns WHERE table_name = '%s' and column_name = 'TABLE_GROUP_ID'";
+        tables.forEach(table -> {
             try {
-                String ddl = readSql(UPGRADE_SQL, true, table);
-                executeSql(ddl);
-                logger.info(ddl);
+                String query = String.format(queryColumnCount, table);
+                // 是否已升级
+                int count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(query, Integer.class));
+                if (count == 0) {
+                    String ddl = readSql(UPGRADE_SQL, true, table);
+                    executeSql(ddl);
+                    logger.info(ddl);
+                }
             } catch (Exception e) {
                 if (e.getCause() instanceof SQLSyntaxErrorException) {
                     SQLSyntaxErrorException ex = (SQLSyntaxErrorException) e.getCause();
@@ -404,11 +410,11 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         if (!CollectionUtils.isEmpty(list) && query.isEnableHighLightSearch()) {
             List<Param> highLight = query.getParams().stream().filter(p -> p.isHighlighter()).collect(Collectors.toList());
             list.forEach(row ->
-                highLight.forEach(p -> {
-                    String text = String.valueOf(row.get(p.getKey()));
-                    String replacement = new StringBuilder("<span style='color:red'>").append(p.getValue()).append("</span>").toString();
-                    row.put(p.getKey(), StringUtil.replace(text, p.getValue(), replacement));
-                })
+                    highLight.forEach(p -> {
+                        String text = String.valueOf(row.get(p.getKey()));
+                        String replacement = new StringBuilder("<span style='color:red'>").append(p.getValue()).append("</span>").toString();
+                        row.put(p.getKey(), StringUtil.replace(text, p.getValue(), replacement));
+                    })
             );
         }
     }

+ 0 - 9
dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html

@@ -48,15 +48,6 @@
                       th:text="${connector?.config?.url}?:'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
         </div>
     </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">驱动 </label>
-        <div class="col-sm-10">
-            <select class="form-control select-control" name="driverClassName">
-                <option value="com.mysql.jdbc.Driver" th:selected="${connector?.config?.driverClassName eq 'com.mysql.jdbc.Driver'}">com.mysql.jdbc.Driver</option>
-                <option value="com.mysql.cj.jdbc.Driver" th:selected="${connector?.config?.driverClassName eq 'com.mysql.cj.jdbc.Driver'}">com.mysql.cj.jdbc.Driver</option>
-            </select>
-        </div>
-    </div>
 
     <script type="text/javascript">
         $(function () {

+ 2 - 15
dbsyncer-web/src/main/resources/public/connector/addElasticsearch.html

@@ -18,28 +18,15 @@
         <div class="col-sm-4">
             <input class="form-control" name="index" type="text" maxlength="32" dbsyncer-valid="require" placeholder="test" th:value="${connector?.config?.index}?:'test'"/>
         </div>
-        <label class="col-sm-2 control-label">type <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4 ">
-            <input class="form-control" name="type" type="text" maxlength="32" dbsyncer-valid="require" placeholder="_doc" th:value="${connector?.config?.type}?:'_doc'"/>
-        </div>
-    </div>
-    <div class="form-group">
         <label class="col-sm-2 control-label">主键 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果使用的表没有主键,可以自定义主键(大小写必须一致,添加表映射关系时,主键不能为空)。"></i><strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-4">
             <input class="form-control" name="primaryKey" type="text" maxlength="32" dbsyncer-valid="require" placeholder="id" th:value="${connector?.config?.primaryKey}?:'id'"/>
         </div>
-        <label class="col-sm-2 control-label">地址 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="多个使用英文逗号,例如:192.168.1.100:9200,192.168.1.200:9200"></i> <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <textarea id="sql" name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="2" th:text="${connector?.config?.url}?:'127.0.0.1:9200'"></textarea>
-        </div>
     </div>
     <div class="form-group">
-        <label class="col-sm-2 control-label">协议 </label>
+        <label class="col-sm-2 control-label">地址 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="多个使用英文逗号,例如:http://192.168.1.100:9200,http://192.168.1.200:9200"></i> <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-4">
-            <select class="form-control select-control" name="schema">
-                <option value="http" th:selected="${connector?.config?.schema eq 'http'}">http</option>
-                <option value="tcp" th:selected="${connector?.config?.schema eq 'tcp'}">tcp</option>
-            </select>
+            <textarea id="sql" name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="2" th:text="${connector?.config?.url}?:'http://127.0.0.1:9200'"></textarea>
         </div>
         <div class="col-sm-6"></div>
     </div>

+ 0 - 15
dbsyncer-web/src/main/resources/public/connector/addMysql.html

@@ -22,21 +22,6 @@
                       th:text="${connector?.config?.url} ?: 'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
         </div>
     </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">驱动 </label>
-        <div class="col-sm-10">
-            <select class="form-control select-control" name="driverClassName">
-                <option value="com.mysql.jdbc.Driver"
-                        th:selected="${connector?.config?.driverClassName eq 'com.mysql.jdbc.Driver'}">
-                    com.mysql.jdbc.Driver
-                </option>
-                <option value="com.mysql.cj.jdbc.Driver"
-                        th:selected="${connector?.config?.driverClassName eq 'com.mysql.cj.jdbc.Driver'}">
-                    com.mysql.cj.jdbc.Driver
-                </option>
-            </select>
-        </div>
-    </div>
 
     <script type="text/javascript">
         $(function () {

+ 1 - 1
dbsyncer-web/src/main/resources/public/index/index.html

@@ -72,8 +72,8 @@
                                         <div class="dropdown">
                                             <a data-toggle="dropdown" href="javascript:;"><span class="well-sign-operation"><i class="fa fa-gears fa-1x"></i></span></a>
                                             <ul class="dropdown-menu" role="menu" aria-labelledby="dLabel">
-                                                <li class="remove" th:url="'/connector/remove?id='+${c?.id}"><a href="javascript:;"><i class="fa fa-trash well-sign-red"></i>&nbsp;&nbsp;删除</a></li>
                                                 <li class="copy" th:url="'/connector/copy?id='+${c?.id}"><a href="javascript:;"><i class="fa fa-copy"></i>&nbsp;复制</a></li>
+                                                <li class="remove" th:url="'/connector/remove?id='+${c?.id}"><a href="javascript:;"><i class="fa fa-trash well-sign-red"></i> 删除</a></li>
                                             </ul>
                                         </div>
                                     </div>