Sfoglia il codice sorgente

Merge remote-tracking branch 'origin/v_2.0' into v_2.0

穿云 6 mesi fa
parent
commit
12ad35c6f4

+ 4 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java

@@ -162,6 +162,10 @@ public abstract class DateFormatUtil {
 
             // 2022-07-21T05:00:59+0800
             if (s.length() == 24) {
+                // 2024-06-14T16:00:00.000Z
+                if (StringUtil.endsWith(s, "Z")) {
+                    return stringToTimestamp(s, DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+                }
                 return stringToTimestamp(s, TS_TZ_FORMATTER);
             }
 

+ 4 - 1
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -239,7 +239,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         genSearchSourceBuilder(builder, config.getCommand());
         builder.from((config.getPageIndex() - 1) * config.getPageSize());
         builder.size(config.getPageSize());
-        builder.timeout(TimeValue.timeValueMillis(10));
+        builder.timeout(TimeValue.timeValueSeconds(connectorInstance.getConfig().getTimeoutSeconds()));
         List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(config.getTable());
         if (!CollectionUtils.isEmpty(primaryKeys)) {
             primaryKeys.forEach(pk -> builder.sort(pk, SortOrder.ASC));
@@ -259,6 +259,9 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
             for (SearchHit hit : searchHits) {
                 list.add(hit.getSourceAsMap());
             }
+            if (searchResponse.getInternalResponse().timedOut()) {
+                throw new ElasticsearchException("search timeout:"+ searchResponse.getTook().getMillis() +"ms, pageIndex:" + config.getPageIndex() + ", pageSize:" + config.getPageSize());
+            }
             return new Result(list);
         } catch (IOException e) {
             logger.error(e.getMessage());

+ 2 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/cdc/ESQuartzListener.java

@@ -6,6 +6,7 @@ package org.dbsyncer.connector.elasticsearch.cdc;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.elasticsearch.ElasticsearchConnector;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.enums.QuartzFilterEnum;
 import org.dbsyncer.sdk.listener.AbstractQuartzListener;
@@ -79,6 +80,7 @@ public final class ESQuartzListener extends AbstractQuartzListener {
         }
         point.setCommand(ConnectorConstant.OPERTION_QUERY, command.get(ConnectorConstant.OPERTION_QUERY));
         point.setCommand(ConnectorConstant.OPERTION_QUERY_FILTER, JsonUtil.objToJson(filters));
+        point.setCommand(ElasticsearchConnector.SOURCE_INDEX_NAME, command.get(ElasticsearchConnector.SOURCE_INDEX_NAME));
         return point;
     }
 }

+ 12 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/config/ESConfig.java

@@ -29,6 +29,11 @@ public class ESConfig extends ConnectorConfig {
      */
     private String password;
 
+    /**
+     * 查询请求超时(秒)
+     */
+    private int timeoutSeconds = 10;
+
     public String getUrl() {
         return url;
     }
@@ -53,4 +58,11 @@ public class ESConfig extends ConnectorConfig {
         this.password = password;
     }
 
+    public int getTimeoutSeconds() {
+        return timeoutSeconds;
+    }
+
+    public void setTimeoutSeconds(int timeoutSeconds) {
+        this.timeoutSeconds = timeoutSeconds;
+    }
 }

+ 13 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/schema/ESDateValueMapper.java

@@ -3,6 +3,7 @@
  */
 package org.dbsyncer.connector.elasticsearch.schema;
 
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.sdk.connector.AbstractValueMapper;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.elasticsearch.ElasticsearchException;
@@ -35,6 +36,18 @@ public class ESDateValueMapper extends AbstractValueMapper<java.util.Date> {
             return new java.util.Date((Long) val);
         }
 
+        if (val instanceof Integer) {
+            return new java.util.Date(Long.parseLong(val.toString()));
+        }
+
+        if (val instanceof String) {
+            String s = (String) val;
+            Timestamp timestamp = DateFormatUtil.stringToTimestamp(s);
+            if (null != timestamp) {
+                return new java.util.Date(timestamp.getTime());
+            }
+        }
+
         throw new ElasticsearchException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 3 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/validator/ESConfigValidator.java

@@ -3,6 +3,7 @@
  */
 package org.dbsyncer.connector.elasticsearch.validator;
 
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.connector.elasticsearch.config.ESConfig;
 import org.dbsyncer.sdk.connector.ConfigValidator;
 import org.springframework.util.Assert;
@@ -23,6 +24,7 @@ public final class ESConfigValidator implements ConfigValidator<ESConfig> {
         String username = params.get("username");
         String password = params.get("password");
         String url = params.get("url");
+        String timeoutSeconds = params.get("timeoutSeconds");
         Assert.hasText(username, "Username is empty.");
         Assert.hasText(password, "Password is empty.");
         Assert.hasText(url, "Url is empty.");
@@ -30,5 +32,6 @@ public final class ESConfigValidator implements ConfigValidator<ESConfig> {
         connectorConfig.setUsername(username);
         connectorConfig.setPassword(password);
         connectorConfig.setUrl(url);
+        connectorConfig.setTimeoutSeconds(NumberUtil.toInt(timeoutSeconds));
     }
 }

+ 4 - 1
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/resources/public/connector/addElasticsearch.html

@@ -18,7 +18,10 @@
         <div class="col-sm-4">
             <input class="form-control" name="url" type="text" maxlength="1024" dbsyncer-valid="require" th:value="${connector?.config?.url}?:'http://127.0.0.1:9200'"/>
         </div>
-        <div class="col-sm-6"></div>
+        <label class="col-sm-2 control-label">请求超时(秒)</label>
+        <div class="col-sm-4">
+            <input class="form-control" name="timeoutSeconds" type="number" dbsyncer-valid="require" min="1" max="120" th:value="${connector?.config?.timeoutSeconds}?:10"/>
+        </div>
     </div>
 </div>
 

+ 9 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/DateValueMapper.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.sdk.connector.schema;
 
 import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.connector.AbstractValueMapper;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
@@ -15,6 +16,14 @@ import java.sql.Timestamp;
  */
 public class DateValueMapper extends AbstractValueMapper<Date> {
 
+    @Override
+    protected boolean skipConvert(Object val) {
+        if (val instanceof String) {
+            return StringUtil.equals((CharSequence) val, "0000-00-00");
+        }
+        return super.skipConvert(val);
+    }
+
     @Override
     protected Date convert(ConnectorInstance connectorInstance, Object val) {
         if (val instanceof Timestamp) {

+ 3 - 9
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/FloatValueMapper.java

@@ -4,8 +4,6 @@ import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.connector.AbstractValueMapper;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 
-import java.math.BigDecimal;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -15,13 +13,9 @@ public class FloatValueMapper extends AbstractValueMapper<Float> {
 
     @Override
     protected Float convert(ConnectorInstance connectorInstance, Object val) {
-        if (val instanceof BigDecimal) {
-            BigDecimal bigDecimal = (BigDecimal) val;
-            return bigDecimal.floatValue();
-        }
-        if (val instanceof Double) {
-            Double dbl = (Double) val;
-            return dbl.floatValue();
+        if (val instanceof Number) {
+            Number number = (Number) val;
+            return number.floatValue();
         }
         if (val instanceof String) {
             String strVal = (String) val;

+ 9 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/schema/TimestampValueMapper.java

@@ -2,6 +2,7 @@ package org.dbsyncer.sdk.connector.schema;
 
 import microsoft.sql.DateTimeOffset;
 import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.connector.AbstractValueMapper;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
@@ -19,6 +20,14 @@ import java.time.OffsetDateTime;
  */
 public class TimestampValueMapper extends AbstractValueMapper<Timestamp> {
 
+    @Override
+    protected boolean skipConvert(Object val) {
+        if (val instanceof String) {
+            return StringUtil.equals((CharSequence) val, "0000-00-00 00:00:00");
+        }
+        return super.skipConvert(val);
+    }
+
     @Override
     protected Timestamp convert(ConnectorInstance connectorInstance, Object val) throws SQLException {
         if (val instanceof Date) {