浏览代码

支持oracle clob写es

AE86 3 年之前
父节点
当前提交
ffd4909028

+ 18 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java

@@ -1,7 +1,10 @@
 package org.dbsyncer.connector.database.setter;
 
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.AbstractSetter;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
@@ -12,4 +15,19 @@ public class BigintSetter extends AbstractSetter<Long> {
         ps.setLong(i, val);
     }
 
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val)
+            throws SQLException {
+        if (val instanceof BigDecimal) {
+            BigDecimal bitDec = (BigDecimal) val;
+            ps.setLong(i, bitDec.longValue());
+            return;
+        }
+        if (val instanceof BigInteger) {
+            BigInteger bitInt = (BigInteger) val;
+            ps.setLong(i, bitInt.longValue());
+            return;
+        }
+        throw new ConnectorException(String.format("BigintSetter can not find type [%s], val [%s]", type, val));
+    }
 }

+ 24 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/AbstractHandler.java

@@ -1,7 +1,11 @@
 package org.dbsyncer.parser.convert;
 
+import org.apache.commons.io.IOUtils;
 import org.dbsyncer.parser.ParserException;
 
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+
 public abstract class AbstractHandler implements Handler {
 
     /**
@@ -24,4 +28,24 @@ public abstract class AbstractHandler implements Handler {
         }
         return null;
     }
+
+    protected String getString(InputStream in, int length){
+        BufferedInputStream is = null;
+        try {
+            is = new BufferedInputStream(in);
+            byte[] bytes = new byte[length];
+            int len = bytes.length;
+            int offset = 0;
+            int read;
+            while (offset < len && (read = is.read(bytes, offset, len - offset)) >= 0) {
+                offset += read;
+            }
+            return new String(bytes);
+        } catch (Exception e) {
+            throw new ParserException(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+    }
+
 }

+ 30 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/BlobToStringHandler.java

@@ -0,0 +1,30 @@
+package org.dbsyncer.parser.convert.handler;
+
+import org.dbsyncer.parser.ParserException;
+import org.dbsyncer.parser.convert.AbstractHandler;
+
+import java.sql.SQLException;
+
+/**
+ * Blob转String
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/1/20 23:04
+ */
+public class BlobToStringHandler extends AbstractHandler {
+
+    @Override
+    public Object convert(String args, Object value) {
+        if (value instanceof oracle.sql.BLOB) {
+            oracle.sql.BLOB blob = (oracle.sql.BLOB) value;
+            try {
+                value = getString(blob.getBinaryStream(), (int) blob.length());
+            } catch (SQLException e) {
+                throw new ParserException(e.getMessage());
+            }
+        }
+        return value;
+    }
+
+}

+ 30 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/ClobToStringHandler.java

@@ -0,0 +1,30 @@
+package org.dbsyncer.parser.convert.handler;
+
+import org.dbsyncer.parser.ParserException;
+import org.dbsyncer.parser.convert.AbstractHandler;
+
+import java.sql.SQLException;
+
+/**
+ * Clob转String
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/1/20 23:04
+ */
+public class ClobToStringHandler extends AbstractHandler {
+
+    @Override
+    public Object convert(String args, Object value) {
+        if (value instanceof oracle.sql.CLOB) {
+            oracle.sql.CLOB clob = (oracle.sql.CLOB) value;
+            try {
+                value = getString(clob.getAsciiStream(), (int) clob.length());
+            } catch (SQLException e) {
+                throw new ParserException(e.getMessage());
+            }
+        }
+        return value;
+    }
+
+}

+ 12 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ConvertEnum.java

@@ -26,10 +26,6 @@ public enum ConvertEnum {
      * 系统日期Date
      */
     SYSTEM_DATE("SYSTEM_DATE", "系统日期", 0, new DateHandler()),
-    /**
-     * Byte[]转String
-     */
-    BYTES_TO_STRING("BYTES_TO_STRING", "Byte[]转String", 0, new BytesToStringHandler()),
     /**
      * Timestamp转Date
      */
@@ -46,6 +42,18 @@ public enum ConvertEnum {
      * Date转中国标准时间
      */
     DATE_TO_CHINESE_STANDARD_TIME("DATE_TO_CHINESE_STANDARD_TIME", "Date转yyyy-MM-dd HH:mm:ss", 0, new DateToChineseStandardTimeHandler()),
+    /**
+     * Byte[]转String
+     */
+    BYTES_TO_STRING("BYTES_TO_STRING", "Byte[]转String", 0, new BytesToStringHandler()),
+    /**
+     * Clob转String
+     */
+    CLOB_TO_STRING("CLOB_TO_STRING", "Clob转String", 0, new ClobToStringHandler()),
+    /**
+     * Blob转String
+     */
+    BLOB_TO_STRING("BLOB_TO_STRING", "Blob转String", 0, new BlobToStringHandler()),
     /**
      * 替换
      */