AE86 2 лет назад
Родитель
Сommit
085a7aadec

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BlobSetter.java

@@ -16,13 +16,15 @@ public class BlobSetter extends AbstractSetter<Blob> {
 
     @Override
     protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
-        // 存放jpg等文件
         if (val instanceof Blob) {
             Blob blob = (Blob) val;
             ps.setBlob(i, blob);
             return;
         }
+        if (val instanceof byte[]) {
+            ps.setBlob(i, mapper.getBlob((byte[]) val));
+            return;
+        }
         throw new ConnectorException(String.format("BlobSetter can not find type [%s], val [%s]", type, val));
     }
-
 }

+ 8 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/ClobSetter.java

@@ -15,10 +15,14 @@ public class ClobSetter extends AbstractSetter<Clob> {
     }
 
     @Override
-    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
-        if(val instanceof Clob) {
-            Clob clob = (Clob) val;
-            ps.setClob(i, clob);
+    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val)
+            throws SQLException {
+        if (val instanceof Clob) {
+            ps.setClob(i, (Clob) val);
+            return;
+        }
+        if (val instanceof byte[]) {
+            ps.setClob(i, mapper.getClob((byte[]) val));
             return;
         }
         throw new ConnectorException(String.format("ClobSetter can not find type [%s], val [%s]", type, val));

+ 1 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/NClobSetter.java

@@ -17,9 +17,7 @@ public class NClobSetter extends AbstractSetter<NClob> {
     @Override
     protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
         if (val instanceof byte[]) {
-            byte[] bytes = (byte[]) val;
-            NClob nClob = mapper.getNClob(bytes);
-            ps.setNClob(i, nClob);
+            ps.setNClob(i, mapper.getNClob((byte[]) val));
             return;
         }
         throw new ConnectorException(String.format("NClobSetter can not find type [%s], val [%s]", type, val));

+ 30 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/PreparedFieldMapper.java

@@ -1,26 +1,46 @@
 package org.dbsyncer.connector.database.setter;
 
 import oracle.jdbc.OracleConnection;
-import oracle.sql.NCLOB;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
 
-import java.sql.Connection;
-import java.sql.NClob;
-import java.sql.SQLException;
+import java.nio.charset.Charset;
+import java.sql.*;
 
 public class PreparedFieldMapper {
 
-    private Connection connection;
+    private SimpleConnection connection;
 
     public PreparedFieldMapper(Connection connection) {
-        this.connection = connection;
+        this.connection = (SimpleConnection) connection;
     }
 
     public NClob getNClob(byte[] bytes) throws SQLException {
-        if (connection instanceof OracleConnection) {
-            OracleConnection conn = (OracleConnection) connection;
-            return new NCLOB(conn, bytes);
+        if (connection.getConnection() instanceof OracleConnection) {
+            OracleConnection conn = (OracleConnection) connection.getConnection();
+            NClob nClob = conn.createNClob();
+            nClob.setString(1, new String(bytes, Charset.defaultCharset()));
+            return nClob;
         }
         return connection.createNClob();
     }
 
-}
+    public Blob getBlob(byte[] bytes) throws SQLException {
+        if (connection.getConnection() instanceof OracleConnection) {
+            OracleConnection conn = (OracleConnection) connection.getConnection();
+            Blob blob = conn.createBlob();
+            blob.setBytes(1, bytes);
+            return blob;
+        }
+        return connection.createBlob();
+    }
+
+    public Clob getClob(byte[] bytes) throws SQLException {
+        if (connection.getConnection() instanceof OracleConnection) {
+            OracleConnection conn = (OracleConnection) connection.getConnection();
+            Clob clob = conn.createClob();
+            clob.setString(1, new String(bytes, Charset.defaultCharset()));
+            return clob;
+        }
+        return connection.createClob();
+    }
+}

+ 45 - 1
dbsyncer-connector/src/main/test/SqlServerConnectionTest.java

@@ -1,9 +1,16 @@
+import oracle.jdbc.OracleConnection;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 
+import java.nio.charset.Charset;
+import java.sql.Clob;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.concurrent.*;
 
@@ -16,6 +23,43 @@ public class SqlServerConnectionTest {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    @Test
+    public void testByte() {
+        DatabaseConfig config = new DatabaseConfig();
+        config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:XE");
+        config.setUsername("ae86");
+        config.setPassword("123");
+        config.setDriverClassName("oracle.jdbc.OracleDriver");
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config);
+
+        String executeSql="UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
+        int[] execute = connectorMapper.execute(databaseTemplate ->
+                databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
+                    @Override
+                    public void setValues(PreparedStatement ps, int i) {
+                        try {
+                            SimpleConnection connection = (SimpleConnection) databaseTemplate.getConnection();
+                            OracleConnection conn = (OracleConnection) connection.getConnection();
+                            Clob clob = conn.createClob();
+                            clob.setString(1, new String("中文888".getBytes(Charset.defaultCharset())));
+
+                            ps.setString(1, "hello888");
+                            ps.setClob(2, clob);
+                            ps.setInt(3, 2);
+                        } catch (SQLException e) {
+                            e.printStackTrace();
+                        }
+                    }
+
+                    @Override
+                    public int getBatchSize() {
+                        return 1;
+                    }
+                })
+        );
+        logger.info("execute:{}", execute);
+    }
+
     @Test
     public void testConnection() throws InterruptedException {
         DatabaseConfig config = new DatabaseConfig();
@@ -65,4 +109,4 @@ public class SqlServerConnectionTest {
         TimeUnit.SECONDS.sleep(3);
         logger.info("test end");
     }
-}
+}

+ 8 - 12
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -34,8 +34,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.sql.*;
+import java.nio.charset.Charset;
 import java.sql.Date;
+import java.sql.*;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
@@ -250,7 +251,6 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             case "oracle.sql.CLOB":
                 CLOB clob = (CLOB) v;
                 return ByteString.copyFrom(getBytes(clob));
-
             default:
                 logger.error("Unsupported serialize value type:{}", type);
                 return null;
@@ -417,19 +417,15 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     }
 
     private byte[] getBytes(CLOB clob) {
-        InputStream is = null;
-        byte[] b = null;
         try {
-            is = clob.binaryStreamValue();
-            b = new byte[(int) clob.length()];
-            is.read(b);
-            return b;
-        } catch (Exception e) {
+            long length = clob.length();
+            if (length > 0) {
+                return clob.getSubString(1, (int) length).getBytes(Charset.defaultCharset());
+            }
+        } catch (SQLException e) {
             logger.error(e.getMessage());
-        } finally {
-            IOUtils.closeQuietly(is);
         }
-        return b;
+        return null;
     }
 
 }