Browse Source

!179 merge
Merge pull request !179 from moyu/ae86_dev

AE86 1 year ago
parent
commit
fe9c619fbd

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ESFieldTypeEnum.java

@@ -64,7 +64,7 @@ public enum ESFieldTypeEnum {
     /**
      * 弥补object类型不足,格式出现list放object会变为array:"test": [{"a":"b},{}]
      */
-    NESTED("nested", Types.VARCHAR),
+    NESTED("nested", Types.OTHER),
     OBJECT("object", Types.VARCHAR),
     IP("ip", Types.VARCHAR),
     TOKEN_COUNT("token_count", Types.BIGINT),

+ 13 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -48,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -72,6 +73,11 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         filters.putIfAbsent(FilterEnum.LIKE.getName(), (builder, k, v) -> builder.filter(QueryBuilders.wildcardQuery(k, v)));
     }
 
+    public ESConnector() {
+        VALUE_MAPPERS.put(Types.DATE, new ESDateValueMapper());
+        VALUE_MAPPERS.put(Types.OTHER, new ESOtherValueMapper());
+    }
+
     @Override
     public ConnectorMapper connect(ESConfig config) {
         return new ESConnectorMapper(config);
@@ -266,6 +272,13 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         properties.forEach((fieldName, c) -> {
             Map fieldDesc = (Map) c;
             String columnType = (String) fieldDesc.get("type");
+            // 如果时间类型做了format, 按字符串类型处理
+            if (StringUtil.equals(ESFieldTypeEnum.DATE.getCode(), columnType)) {
+                if (fieldDesc.containsKey("format")) {
+                    fields.add(new Field(fieldName, columnType, ESFieldTypeEnum.KEYWORD.getType()));
+                    return;
+                }
+            }
             fields.add(new Field(fieldName, columnType, ESFieldTypeEnum.getType(columnType)));
         });
     }

+ 31 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESDateValueMapper.java

@@ -0,0 +1,31 @@
+package org.dbsyncer.connector.es;
+
+import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.connector.AbstractValueMapper;
+import org.dbsyncer.connector.ConnectorException;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+
+/**
+ * @author moyu
+ * @version 1.0.0
+ * @date 2023/10/12 0:07
+ */
+public class ESDateValueMapper extends AbstractValueMapper<java.util.Date> {
+
+    @Override
+    protected java.util.Date convert(ConnectorMapper connectorMapper, Object val) {
+        if (val instanceof Timestamp) {
+            Timestamp timestamp = (Timestamp) val;
+            return new java.util.Date(timestamp.getTime());
+        }
+
+        if (val instanceof Date) {
+            Date date = (Date) val;
+            return new java.util.Date(date.getTime());
+        }
+
+        throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
+    }
+}

+ 35 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESOtherValueMapper.java

@@ -0,0 +1,35 @@
+package org.dbsyncer.connector.es;
+
+import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.connector.AbstractValueMapper;
+import org.dbsyncer.connector.ConnectorException;
+import org.postgresql.util.PGobject;
+
+import java.util.Map;
+
+/**
+ * @author moyu
+ * @version 1.0.0
+ * @date 2023/10/12 0:07
+ */
+public class ESOtherValueMapper extends AbstractValueMapper<Map> {
+
+    @Override
+    protected Map convert(ConnectorMapper connectorMapper, Object val) {
+        if (val instanceof String) {
+            return JsonUtil.jsonToObj((String) val, Map.class);
+        }
+
+        if (val instanceof byte[]) {
+            return JsonUtil.jsonToObj(new String((byte[]) val), Map.class);
+        }
+
+        if (val instanceof PGobject) {
+            PGobject pgObject = (PGobject) val;
+            return JsonUtil.jsonToObj(pgObject.getValue(), Map.class);
+        }
+
+        throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
+    }
+}