AE86 3 years ago
parent
commit
57140fc37d

+ 27 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BitSetter.java

@@ -0,0 +1,27 @@
+package org.dbsyncer.connector.database.setter;
+
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.database.AbstractSetter;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.BitSet;
+
+public class BitSetter extends AbstractSetter<byte[]> {
+
+    @Override
+    protected void set(PreparedStatement ps, int i, byte[] val) throws SQLException {
+        ps.setBytes(i, val);
+    }
+
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
+        if (val instanceof BitSet) {
+            BitSet bitSet = (BitSet) val;
+            ps.setBytes(i, bitSet.toByteArray());
+            return;
+        }
+        throw new ConnectorException(String.format("BitSetter can not find type [%s], val [%s]", type, val));
+    }
+
+}

+ 1 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SetterEnum.java

@@ -37,6 +37,7 @@ public enum SetterEnum {
     DECIMAL(Types.DECIMAL, new DecimalSetter()),
     DOUBLE(Types.DOUBLE, new DoubleSetter()),
     FLOAT(Types.FLOAT, new FloatSetter()),
+    BIT(Types.BIT, new BitSetter()),
     BLOB(Types.BLOB, new BlobSetter()),
     CLOB(Types.CLOB, new ClobSetter()),
     NCLOB(Types.NCLOB, new NClobSetter()),

+ 60 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/DatetimeV2Deserialize.java

@@ -0,0 +1,60 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+public class DatetimeV2Deserialize {
+
+    public Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        long datetime = bigEndianLong(inputStream.read(5), 0, 5);
+        int yearMonth = bitSlice(datetime, 1, 17, 40);
+        int fsp = deserializeFractionalSeconds(meta, inputStream);
+        LocalDateTime time = LocalDateTime.of(
+                yearMonth / 13,
+                yearMonth % 13,
+                bitSlice(datetime, 18, 5, 40),
+                bitSlice(datetime, 23, 5, 40),
+                bitSlice(datetime, 28, 6, 40),
+                bitSlice(datetime, 34, 6, 40),
+                fsp / 1000
+        );
+        return Timestamp.valueOf(time);
+    }
+
+    private long bigEndianLong(byte[] bytes, int offset, int length) {
+        long result = 0;
+        for (int i = offset; i < (offset + length); i++) {
+            byte b = bytes[i];
+            result = (result << 8) | (b >= 0 ? (int) b : (b + 256));
+        }
+        return result;
+    }
+
+    private int deserializeFractionalSeconds(int meta, ByteArrayInputStream inputStream) throws IOException {
+        int length = (meta + 1) / 2;
+        if (length > 0) {
+            int fraction = bigEndianInteger(inputStream.read(length), 0, length);
+            return fraction * (int) Math.pow(100, 3 - length);
+        }
+        return 0;
+    }
+
+    private int bigEndianInteger(byte[] bytes, int offset, int length) {
+        int result = 0;
+        for (int i = offset; i < (offset + length); i++) {
+            byte b = bytes[i];
+            result = (result << 8) | (b >= 0 ? (int) b : (b + 256));
+        }
+        return result;
+    }
+
+    private int bitSlice(long value, int bitOffset, int numberOfBits, int payloadSize) {
+        long result = value >> payloadSize - (bitOffset + numberOfBits);
+        return (int) (result & ((1 << numberOfBits) - 1));
+    }
+
+}

+ 24 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/DeleteDeserializer.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class DeleteDeserializer extends UpdateRowsEventDataDeserializer {
+
+    private DatetimeV2Deserialize datetimeV2Deserialize;
+
+    public DeleteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+        datetimeV2Deserialize = new DatetimeV2Deserialize();
+    }
+
+    protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
+    }
+
+}

+ 24 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/UpdateDeserializer.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class UpdateDeserializer extends UpdateRowsEventDataDeserializer {
+
+    private DatetimeV2Deserialize datetimeV2Deserialize;
+
+    public UpdateDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+        datetimeV2Deserialize = new DatetimeV2Deserialize();
+    }
+
+    protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
+    }
+
+}

+ 24 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/WriteDeserializer.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class WriteDeserializer extends UpdateRowsEventDataDeserializer {
+
+    private DatetimeV2Deserialize datetimeV2Deserialize;
+
+    public WriteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+        datetimeV2Deserialize = new DatetimeV2Deserialize();
+    }
+
+    protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
+    }
+
+}

+ 0 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -91,8 +91,6 @@ public interface Manager extends Executor {
 
     Config getConfig(String configId);
 
-    void removeConfig(String configId);
-
     List<Config> getConfigAll();
 
     // Data

+ 0 - 5
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -230,11 +230,6 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         return operationTemplate.queryObject(Config.class, configId);
     }
 
-    @Override
-    public void removeConfig(String configId) {
-        operationTemplate.remove(new OperationConfig(configId));
-    }
-
     @Override
     public List<Config> getConfigAll() {
         Config config = new Config();