AE86 il y a 2 ans
Parent
commit
ca0ed45378

+ 29 - 29
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -19,40 +19,40 @@ public abstract class AbstractConnector {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected static final Map<Integer, ValueMapper> valueMappers = new LinkedHashMap<>();
+    protected final Map<Integer, ValueMapper> VALUE_MAPPERS = new LinkedHashMap<>();
 
-    static {
+    public AbstractConnector() {
         // 常用类型
-        valueMappers.putIfAbsent(Types.VARCHAR, new VarcharValueMapper());
-        valueMappers.putIfAbsent(Types.INTEGER, new IntegerValueMapper());
-        valueMappers.putIfAbsent(Types.BIGINT, new BigintValueMapper());
-        valueMappers.putIfAbsent(Types.TIMESTAMP, new TimestampValueMapper());
-        valueMappers.putIfAbsent(Types.DATE, new DateValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.VARCHAR, new VarcharValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.INTEGER, new IntegerValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.BIGINT, new BigintValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.TIMESTAMP, new TimestampValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.DATE, new DateValueMapper());
 
         // 较少使用
-        valueMappers.putIfAbsent(Types.CHAR, new CharValueMapper());
-        valueMappers.putIfAbsent(Types.NCHAR, new NCharValueMapper());
-        valueMappers.putIfAbsent(Types.NVARCHAR, new NVarcharValueMapper());
-        valueMappers.putIfAbsent(Types.LONGVARCHAR, new LongVarcharValueMapper());
-        valueMappers.putIfAbsent(Types.NUMERIC, new NumberValueMapper());
-        valueMappers.putIfAbsent(Types.BINARY, new BinaryValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.CHAR, new CharValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.NCHAR, new NCharValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.NVARCHAR, new NVarcharValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.LONGVARCHAR, new LongVarcharValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.NUMERIC, new NumberValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.BINARY, new BinaryValueMapper());
 
         // 很少使用
-        valueMappers.putIfAbsent(Types.SMALLINT, new SmallintValueMapper());
-        valueMappers.putIfAbsent(Types.TINYINT, new TinyintValueMapper());
-        valueMappers.putIfAbsent(Types.TIME, new TimeValueMapper());
-        valueMappers.putIfAbsent(Types.DECIMAL, new DecimalValueMapper());
-        valueMappers.putIfAbsent(Types.DOUBLE, new DoubleValueMapper());
-        valueMappers.putIfAbsent(Types.FLOAT, new FloatValueMapper());
-        valueMappers.putIfAbsent(Types.BIT, new BitValueMapper());
-        valueMappers.putIfAbsent(Types.BLOB, new BlobValueMapper());
-        valueMappers.putIfAbsent(Types.CLOB, new ClobValueMapper());
-        valueMappers.putIfAbsent(Types.NCLOB, new NClobValueMapper());
-        valueMappers.putIfAbsent(Types.ROWID, new RowIdValueMapper());
-        valueMappers.putIfAbsent(Types.REAL, new RealValueMapper());
-        valueMappers.putIfAbsent(Types.VARBINARY, new VarBinaryValueMapper());
-        valueMappers.putIfAbsent(Types.LONGVARBINARY, new LongVarBinaryValueMapper());
-        valueMappers.putIfAbsent(Types.OTHER, new OtherValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.SMALLINT, new SmallintValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.TINYINT, new TinyintValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.TIME, new TimeValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.DECIMAL, new DecimalValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.DOUBLE, new DoubleValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.FLOAT, new FloatValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.BIT, new BitValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.BLOB, new BlobValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.CLOB, new ClobValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.NCLOB, new NClobValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.ROWID, new RowIdValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.REAL, new RealValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.VARBINARY, new VarBinaryValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.LONGVARBINARY, new LongVarBinaryValueMapper());
+        VALUE_MAPPERS.putIfAbsent(Types.OTHER, new OtherValueMapper());
     }
 
     /**
@@ -74,7 +74,7 @@ public abstract class AbstractConnector {
                     continue;
                 }
                 // 根据字段类型转换值
-                final ValueMapper valueMapper = valueMappers.get(f.getType());
+                final ValueMapper valueMapper = VALUE_MAPPERS.get(f.getType());
                 if (null != valueMapper) {
                     // 当数据类型不同时,转换值类型
                     try {

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractValueMapper.java

@@ -40,7 +40,7 @@ public abstract class AbstractValueMapper<T> implements ValueMapper {
      * @param val
      * @return
      */
-    protected Object getDefaultVal(Object val) throws Exception {
+    protected Object getDefaultVal(Object val) {
         return val;
     }
 

+ 6 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -10,8 +10,14 @@ import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
 
+import java.sql.Types;
+
 public final class OracleConnector extends AbstractDatabaseConnector {
 
+    public OracleConnector() {
+        VALUE_MAPPERS.put(Types.OTHER, new OracleOtherValueMapper());
+    }
+
     @Override
     public String getPageSql(PageSql config) {
         return DatabaseConstant.ORACLE_PAGE_SQL_START + config.getQuerySql() + DatabaseConstant.ORACLE_PAGE_SQL_END;

+ 50 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleOtherValueMapper.java

@@ -0,0 +1,50 @@
+package org.dbsyncer.connector.oracle;
+
+import com.microsoft.sqlserver.jdbc.Geometry;
+import oracle.jdbc.OracleConnection;
+import oracle.spatial.geometry.JGeometry;
+import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.connector.AbstractValueMapper;
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
+
+import java.sql.Connection;
+import java.sql.Struct;
+
+/**
+ * JDBC索引{@link java.sql.Types 1111}, JDBC类型java.sql.Struct,支持的数据库类型:
+ * <ol>
+ * <li>用户定义的对象</li>
+ * <li>VARCHAR2</li>
+ * </ol>
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/12/22 22:59
+ */
+public class OracleOtherValueMapper extends AbstractValueMapper<Struct> {
+
+    @Override
+    protected boolean skipConvert(Object val) {
+        return val instanceof java.sql.Struct || val instanceof String;
+    }
+
+    @Override
+    protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
+        // SqlServer Geometry
+        if (val instanceof byte[]) {
+            Object connection = connectorMapper.getConnection();
+            if (connection instanceof Connection) {
+                SimpleConnection simpleConnection = (SimpleConnection) connection;
+                OracleConnection conn = simpleConnection.unwrap(OracleConnection.class);
+                // TODO 兼容Oracle STRUCT 字节数组
+                Geometry geometry = Geometry.deserialize((byte[]) val);
+                Double x = geometry.getX();
+                Double y = geometry.getY();
+                JGeometry jGeometry = new JGeometry(x, y, 0);
+                return JGeometry.store(jGeometry, conn);
+            }
+        }
+        throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
+    }
+}

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -13,8 +13,8 @@ import java.sql.Types;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
-    static {
-        valueMappers.put(Types.OTHER, new PostgreSQLOtherValueMapper());
+    public PostgreSQLConnector() {
+        VALUE_MAPPERS.put(Types.OTHER, new PostgreSQLOtherValueMapper());
     }
 
     @Override

+ 6 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLOtherValueMapper.java

@@ -29,7 +29,7 @@ import org.postgis.binary.BinaryWriter;
  * @version 1.0.0
  * @date 2022/12/22 22:59
  */
-public class PostgreSQLOtherValueMapper extends AbstractValueMapper<Object> {
+public class PostgreSQLOtherValueMapper extends AbstractValueMapper<byte[]> {
 
     @Override
     protected boolean skipConvert(Object val) {
@@ -37,16 +37,12 @@ public class PostgreSQLOtherValueMapper extends AbstractValueMapper<Object> {
     }
 
     @Override
-    protected Object convert(ConnectorMapper connectorMapper, Object val) {
+    protected byte[] convert(ConnectorMapper connectorMapper, Object val) {
         if (val instanceof String) {
-            try {
-                BinaryParser parser = new BinaryParser();
-                Geometry geo = parser.parse((String) val);
-                BinaryWriter bw = new BinaryWriter();
-                return bw.writeBinary(geo);
-            } catch (Exception ex) {
-                return val;
-            }
+            BinaryParser parser = new BinaryParser();
+            Geometry geo = parser.parse((String) val);
+            BinaryWriter bw = new BinaryWriter();
+            return bw.writeBinary(geo);
         }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }

+ 2 - 29
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java

@@ -1,45 +1,18 @@
 package org.dbsyncer.connector.schema;
 
-import com.microsoft.sqlserver.jdbc.Geometry;
-import oracle.jdbc.OracleConnection;
-import oracle.spatial.geometry.JGeometry;
 import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.database.ds.SimpleConnection;
-
-import java.sql.Connection;
-import java.sql.Struct;
 
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/9/16 16:54
  */
-public class OtherValueMapper extends AbstractValueMapper<Struct> {
-
-    @Override
-    protected boolean skipConvert(Object val) {
-        return val instanceof oracle.sql.STRUCT || val instanceof String;
-    }
+public class OtherValueMapper extends AbstractValueMapper<Object> {
 
     @Override
-    protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
-        // SqlServer Geometry
-        if (val instanceof byte[]) {
-            Object connection = connectorMapper.getConnection();
-            if (connection instanceof Connection) {
-                SimpleConnection simpleConnection = (SimpleConnection) connection;
-                if (simpleConnection instanceof OracleConnection) {
-                    OracleConnection conn = simpleConnection.unwrap(OracleConnection.class);
-                    Geometry geometry = Geometry.deserialize((byte[]) val);
-                    Double x = geometry.getX();
-                    Double y = geometry.getY();
-                    JGeometry jGeometry = new JGeometry(x, y, 0);
-                    return JGeometry.store(jGeometry, conn);
-                }
-            }
-        }
+    protected Object convert(ConnectorMapper connectorMapper, Object val) {
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 13 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java

@@ -3,8 +3,10 @@ package org.dbsyncer.storage.util;
 import com.google.protobuf.ByteString;
 import oracle.sql.BLOB;
 import oracle.sql.CLOB;
+import oracle.sql.STRUCT;
 import oracle.sql.TIMESTAMP;
 import org.apache.commons.io.IOUtils;
+import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.binlog.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.slf4j.Logger;
@@ -165,6 +167,8 @@ public abstract class BinlogMessageUtil {
                 return ByteString.copyFrom(getBytes((BLOB) v));
             case "oracle.sql.CLOB":
                 return ByteString.copyFrom(getBytes((CLOB) v));
+            case "oracle.sql.STRUCT":
+                return ByteString.copyFrom(getBytes((STRUCT) v));
             default:
                 logger.error("Unsupported serialize value type:{}", type);
                 return null;
@@ -221,10 +225,10 @@ public abstract class BinlogMessageUtil {
             case Types.BINARY:
             case Types.VARBINARY:
             case Types.LONGVARBINARY:
-                // 二进制对象
             case Types.NCLOB:
             case Types.CLOB:
             case Types.BLOB:
+            case Types.OTHER:
                 return value.asByteArray();
 
             // 暂不支持
@@ -266,4 +270,12 @@ public abstract class BinlogMessageUtil {
         return new byte[0];
     }
 
+    private static byte[] getBytes(STRUCT v) {
+        try {
+            return v.toBytes();
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+            throw new StorageException(e);
+        }
+    }
 }