Browse Source

!91 merge
Merge pull request !91 from AE86/V_1.0.0_RC

AE86 2 years ago
parent
commit
306a3edddd

+ 23 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractValueMapper.java

@@ -22,6 +22,16 @@ public abstract class AbstractValueMapper<T> implements ValueMapper {
      */
     protected abstract T convert(ConnectorMapper connectorMapper, Object val) throws Exception;
 
+    /**
+     * 是否跳过类型转换
+     *
+     * @param val
+     * @return
+     */
+    protected boolean skipConvert(Object val){
+        return false;
+    }
+
     /**
      * 获取默认值
      *
@@ -34,7 +44,18 @@ public abstract class AbstractValueMapper<T> implements ValueMapper {
 
     @Override
     public Object convertValue(ConnectorMapper connectorMapper, Object val) throws Exception {
-        // 当数据类型不同时,返回转换值
-        return null != val && !val.getClass().equals(parameterClazz) ? convert(connectorMapper, val) : getDefaultVal(val);
+        if(null != val){
+            // 是否需要跳过转换
+            if(skipConvert(val)){
+                return val;
+            }
+
+            // 当数据类型不同时,返回转换值
+            if(!val.getClass().equals(parameterClazz)){
+                return convert(connectorMapper, val);
+            }
+        }
+        return getDefaultVal(val);
     }
+
 }

+ 2 - 32
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseValueMapper.java

@@ -6,8 +6,8 @@ import oracle.spatial.geometry.JGeometry;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 
-import java.nio.charset.Charset;
-import java.sql.*;
+import java.sql.SQLException;
+import java.sql.Struct;
 
 public class DatabaseValueMapper {
 
@@ -17,36 +17,6 @@ public class DatabaseValueMapper {
         this.connection = connection;
     }
 
-    public NClob getNClob(byte[] bytes) throws SQLException {
-        if (connection.getConnection() instanceof OracleConnection) {
-            OracleConnection conn = (OracleConnection) connection.getConnection();
-            NClob nClob = conn.createNClob();
-            nClob.setString(1, new String(bytes, Charset.defaultCharset()));
-            return nClob;
-        }
-        return connection.createNClob();
-    }
-
-    public Blob getBlob(byte[] bytes) throws SQLException {
-        if (connection.getConnection() instanceof OracleConnection) {
-            OracleConnection conn = (OracleConnection) connection.getConnection();
-            Blob blob = conn.createBlob();
-            blob.setBytes(1, bytes);
-            return blob;
-        }
-        return connection.createBlob();
-    }
-
-    public Clob getClob(byte[] bytes) throws SQLException {
-        if (connection.getConnection() instanceof OracleConnection) {
-            OracleConnection conn = (OracleConnection) connection.getConnection();
-            Clob clob = conn.createClob();
-            clob.setString(1, new String(bytes, Charset.defaultCharset()));
-            return clob;
-        }
-        return connection.createClob();
-    }
-
     public Struct getStruct(byte[] val) throws SQLException {
         if (connection.getConnection() instanceof OracleConnection) {
             OracleConnection conn = connection.unwrap(OracleConnection.class);

+ 5 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/BlobValueMapper.java

@@ -3,11 +3,8 @@ package org.dbsyncer.connector.schema;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.database.DatabaseValueMapper;
-import org.dbsyncer.connector.database.ds.SimpleConnection;
 
 import java.sql.Blob;
-import java.sql.Connection;
 
 /**
  * @author AE86
@@ -17,15 +14,12 @@ import java.sql.Connection;
 public class BlobValueMapper extends AbstractValueMapper<Blob> {
 
     @Override
-    protected Blob convert(ConnectorMapper connectorMapper, Object val) throws Exception {
-        if (val instanceof byte[]) {
-            Object connection = connectorMapper.getConnection();
-            if (connection instanceof Connection) {
-                final DatabaseValueMapper mapper = new DatabaseValueMapper((SimpleConnection) connection);
-                return mapper.getBlob((byte[]) val);
-            }
-        }
+    protected boolean skipConvert(Object val) {
+        return val instanceof oracle.sql.BLOB || val instanceof byte[];
+    }
 
+    @Override
+    protected Blob convert(ConnectorMapper connectorMapper, Object val) {
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 5 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/ClobValueMapper.java

@@ -3,11 +3,8 @@ package org.dbsyncer.connector.schema;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.database.DatabaseValueMapper;
-import org.dbsyncer.connector.database.ds.SimpleConnection;
 
 import java.sql.Clob;
-import java.sql.Connection;
 
 /**
  * @author AE86
@@ -17,15 +14,12 @@ import java.sql.Connection;
 public class ClobValueMapper extends AbstractValueMapper<Clob> {
 
     @Override
-    protected Clob convert(ConnectorMapper connectorMapper, Object val) throws Exception {
-        if (val instanceof byte[]) {
-            Object connection = connectorMapper.getConnection();
-            if (connection instanceof Connection) {
-                final DatabaseValueMapper mapper = new DatabaseValueMapper((SimpleConnection) connection);
-                return mapper.getClob((byte[]) val);
-            }
-        }
+    protected boolean skipConvert(Object val) {
+        return val instanceof oracle.sql.CLOB || val instanceof byte[] || val instanceof String;
+    }
 
+    @Override
+    protected Clob convert(ConnectorMapper connectorMapper, Object val) {
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 6 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/FloatValueMapper.java

@@ -4,6 +4,8 @@ import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 
+import java.math.BigDecimal;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -13,6 +15,10 @@ public class FloatValueMapper extends AbstractValueMapper<Float> {
 
     @Override
     protected Float convert(ConnectorMapper connectorMapper, Object val) {
+        if (val instanceof BigDecimal) {
+            BigDecimal bigDecimal = (BigDecimal) val;
+            return bigDecimal.floatValue();
+        }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 5 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/NClobValueMapper.java

@@ -3,10 +3,7 @@ package org.dbsyncer.connector.schema;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.database.DatabaseValueMapper;
-import org.dbsyncer.connector.database.ds.SimpleConnection;
 
-import java.sql.Connection;
 import java.sql.NClob;
 
 /**
@@ -17,15 +14,12 @@ import java.sql.NClob;
 public class NClobValueMapper extends AbstractValueMapper<NClob> {
 
     @Override
-    protected NClob convert(ConnectorMapper connectorMapper, Object val) throws Exception {
-        if (val instanceof byte[]) {
-            Object connection = connectorMapper.getConnection();
-            if (connection instanceof Connection) {
-                final DatabaseValueMapper mapper = new DatabaseValueMapper((SimpleConnection) connection);
-                return mapper.getNClob((byte[]) val);
-            }
-        }
+    protected boolean skipConvert(Object val) {
+        return val instanceof oracle.sql.NCLOB || val instanceof byte[];
+    }
 
+    @Override
+    protected NClob convert(ConnectorMapper connectorMapper, Object val) {
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 5 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java

@@ -16,11 +16,13 @@ import java.sql.Struct;
  */
 public class OtherValueMapper extends AbstractValueMapper<Struct> {
 
+    @Override
+    protected boolean skipConvert(Object val) {
+        return val instanceof oracle.sql.STRUCT || val instanceof String;
+    }
+
     @Override
     protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
-        if (val instanceof oracle.sql.STRUCT) {
-            return (Struct) val;
-        }
         // SqlServer Geometry
         if (val instanceof byte[]) {
             Object connection = connectorMapper.getConnection();

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/TimestampValueMapper.java

@@ -16,6 +16,11 @@ import java.time.LocalDateTime;
  */
 public class TimestampValueMapper extends AbstractValueMapper<Timestamp> {
 
+    @Override
+    protected boolean skipConvert(Object val) {
+        return val instanceof oracle.sql.TIMESTAMP;
+    }
+
     @Override
     protected Timestamp convert(ConnectorMapper connectorMapper, Object val) {
         if (val instanceof Date) {

+ 24 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -51,7 +51,6 @@ public class DBChangeNotification {
     private String password;
     private String url;
     private OracleConnection conn;
-    private OracleStatement statement;
     private DatabaseChangeRegistration dcr;
     private Map<Integer, String> tables;
     private Worker worker;
@@ -80,8 +79,8 @@ public class DBChangeNotification {
             }
             conn = connect();
             connected = true;
-            statement = (OracleStatement) conn.createStatement();
-            readTables();
+            OracleStatement statement = (OracleStatement) conn.createStatement();
+            readTables(statement);
 
             Properties prop = new Properties();
             prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
@@ -117,6 +116,7 @@ public class DBChangeNotification {
                     logger.debug("配置监听表异常:{}, {}", sql, e.getMessage());
                 }
             }
+            close(statement);
         } catch (SQLException ex) {
             // if an exception occurs, we need to close the registration in order
             // to interrupt the thread otherwise it will be hanging around.
@@ -145,7 +145,6 @@ public class DBChangeNotification {
             if (null != conn) {
                 conn.unregisterDatabaseChangeNotification(dcr);
             }
-            close(statement);
             close(conn);
         } catch (SQLException e) {
             logger.error(e.getMessage());
@@ -168,7 +167,7 @@ public class DBChangeNotification {
         OracleStatement os = null;
         ResultSet rs = null;
         try {
-            os = (OracleStatement) conn.createStatement();
+            os = createStatement();
             rs = os.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
             if (rs.next()) {
                 final int size = rs.getMetaData().getColumnCount();
@@ -187,15 +186,30 @@ public class DBChangeNotification {
         }
     }
 
-    private void readTables() {
+    private OracleStatement createStatement() throws SQLException {
+        try {
+            OracleStatement statement = (OracleStatement) conn.createStatement();
+            Assert.notNull(statement, "Can't create statement, trying to reconnect.");
+            return statement;
+        } catch (Exception e) {
+            connected = false;
+            logger.error(e.getMessage());
+        }
+        conn = connect();
+        connected = true;
+        logger.info("重连成功");
+        return (OracleStatement) conn.createStatement();
+    }
+
+    private void readTables(OracleStatement statement) {
         tables = new LinkedHashMap<>();
-        List<String> tableList = queryForList(QUERY_TABLE_ALL_SQL, rs -> rs.getString(1));
+        List<String> tableList = queryForList(statement, QUERY_TABLE_ALL_SQL, rs -> rs.getString(1));
         Assert.notEmpty(tableList, "No tables available");
         final String owner = username.toUpperCase();
-        tableList.forEach(tableName -> tables.put(queryForObject(String.format(QUERY_TABLE_ID_SQL, tableName, owner), rs -> rs.getInt(1)), tableName));
+        tableList.forEach(tableName -> tables.put(queryForObject(statement, String.format(QUERY_TABLE_ID_SQL, tableName, owner), rs -> rs.getInt(1)), tableName));
     }
 
-    private <T> List<T> queryForList(String sql, ResultSetMapper<T> mapper) {
+    private <T> List<T> queryForList(OracleStatement statement, String sql, ResultSetMapper<T> mapper) {
         ResultSet rs = null;
         List<T> list = new ArrayList<>();
         try {
@@ -211,7 +225,7 @@ public class DBChangeNotification {
         return list;
     }
 
-    private <T> T queryForObject(String sql, ResultSetMapper<T> mapper) {
+    private <T> T queryForObject(OracleStatement statement, String sql, ResultSetMapper<T> mapper) {
         ResultSet rs = null;
         T apply = null;
         try {