瀏覽代碼

!44 V_1.1.3
Merge pull request !44 from AE86/V_1.0.0

AE86 3 年之前
父節點
當前提交
db15622081
共有 27 個文件被更改,包括 197 次插入40 次删除
  1. 3 3
      README.md
  2. 1 1
      dbsyncer-biz/pom.xml
  3. 1 1
      dbsyncer-cache/pom.xml
  4. 1 1
      dbsyncer-cluster/pom.xml
  5. 1 1
      dbsyncer-common/pom.xml
  6. 1 1
      dbsyncer-connector/pom.xml
  7. 2 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java
  8. 27 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BitSetter.java
  9. 1 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SetterEnum.java
  10. 11 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  11. 1 1
      dbsyncer-listener/pom.xml
  12. 6 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java
  13. 60 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/DatetimeV2Deserialize.java
  14. 24 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/DeleteDeserializer.java
  15. 24 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/UpdateDeserializer.java
  16. 24 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/WriteDeserializer.java
  17. 1 1
      dbsyncer-manager/pom.xml
  18. 0 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java
  19. 0 5
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  20. 1 1
      dbsyncer-monitor/pom.xml
  21. 1 1
      dbsyncer-parser/pom.xml
  22. 1 1
      dbsyncer-plugin/pom.xml
  23. 1 1
      dbsyncer-storage/pom.xml
  24. 1 1
      dbsyncer-web/pom.xml
  25. 1 1
      dbsyncer-web/src/main/resources/application.properties
  26. 1 1
      pom.xml
  27. 1 1
      version.cmd

+ 3 - 3
README.md

@@ -15,7 +15,7 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
                 <td>连接器</td>
                 <td>数据源</td>
                 <td>目标源</td>
-                <td>支持版本</td>
+                <td>支持版本(包含以下)</td>
             </tr>
             <tr>
                 <td>Mysql</td>
@@ -49,7 +49,7 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
             </tr>
             <tr>
                 <td>最近计划</td>
-                <td colspan="3">kafka</td>
+                <td colspan="3">kafka(设计中)、Redis</td>
             </tr>
         </tbody>
     </table>
@@ -58,7 +58,7 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
 ## 安装配置
 #### 步骤
 1. 安装[JRE 1.8](https://www.oracle.com/java/technologies/jdk8-downloads.html)(省略详细)
-2. 下载安装包[DBSyncer-1.0.0-Alpha.zip](https://gitee.com/ghi/dbsyncer/releases)(也可手动编译)
+2. 下载安装包[DBSyncer-1.0.0-Beta.zip](https://gitee.com/ghi/dbsyncer/releases)(也可手动编译)
 3. 解压安装包,Window执行bin/startup.bat,Linux执行bin/startup.sh
 4. 打开浏览器访问:http://127.0.0.1:18686
 5. 账号和密码:admin/admin

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
 		<artifactId>dbsyncer</artifactId>
 		<groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
 	</parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
 		<artifactId>dbsyncer</artifactId>
 		<groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
 	</parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 2 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -3,7 +3,6 @@ package org.dbsyncer.connector;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.connector.config.*;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -105,9 +104,7 @@ public interface Connector<M, C> {
      * @param commandConfig
      * @return
      */
-    default Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return Collections.EMPTY_MAP;
-    }
+    Map<String, String> getSourceCommand(CommandConfig commandConfig);
 
     /**
      * 获取目标源同步参数
@@ -115,7 +112,5 @@ public interface Connector<M, C> {
      * @param commandConfig
      * @return
      */
-    default Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        return Collections.EMPTY_MAP;
-    }
+    Map<String, String> getTargetCommand(CommandConfig commandConfig);
 }

+ 27 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BitSetter.java

@@ -0,0 +1,27 @@
+package org.dbsyncer.connector.database.setter;
+
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.database.AbstractSetter;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.BitSet;
+
+public class BitSetter extends AbstractSetter<byte[]> {
+
+    @Override
+    protected void set(PreparedStatement ps, int i, byte[] val) throws SQLException {
+        ps.setBytes(i, val);
+    }
+
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
+        if (val instanceof BitSet) {
+            BitSet bitSet = (BitSet) val;
+            ps.setBytes(i, bitSet.toByteArray());
+            return;
+        }
+        throw new ConnectorException(String.format("BitSetter can not find type [%s], val [%s]", type, val));
+    }
+
+}

+ 1 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SetterEnum.java

@@ -37,6 +37,7 @@ public enum SetterEnum {
     DECIMAL(Types.DECIMAL, new DecimalSetter()),
     DOUBLE(Types.DOUBLE, new DoubleSetter()),
     FLOAT(Types.FLOAT, new FloatSetter()),
+    BIT(Types.BIT, new BitSetter()),
     BLOB(Types.BLOB, new BlobSetter()),
     CLOB(Types.CLOB, new ClobSetter()),
     NCLOB(Types.NCLOB, new NClobSetter()),

+ 11 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -39,10 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -194,7 +191,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             if (restStatus.getStatus() != RestStatus.OK.getStatus()) {
                 throw new ConnectorException(String.format("error code:%s", restStatus.getStatus()));
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             // 记录错误数据
             result.getFailData().addAll(data);
             result.getFail().set(data.size());
@@ -263,6 +260,15 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         return command;
     }
 
+    @Override
+    public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
+        Table table = commandConfig.getTable();
+        if(!CollectionUtils.isEmpty(table.getColumn())){
+            getPrimaryKeyField(table.getColumn());
+        }
+        return Collections.EMPTY_MAP;
+    }
+
     private void genSearchSourceBuilder(SearchSourceBuilder builder, Map<String, String> command) {
         // 查询字段
         String fieldNamesJson = command.get(ConnectorConstant.OPERTION_QUERY);

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 6 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java

@@ -7,6 +7,9 @@ import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
 import com.github.shyiko.mysql.binlog.network.*;
 import com.github.shyiko.mysql.binlog.network.protocol.*;
 import com.github.shyiko.mysql.binlog.network.protocol.command.*;
+import org.dbsyncer.listener.mysql.deserializer.DeleteDeserializer;
+import org.dbsyncer.listener.mysql.deserializer.UpdateDeserializer;
+import org.dbsyncer.listener.mysql.deserializer.WriteDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -543,9 +546,9 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         eventDataDeserializers.put(EventType.UPDATE_ROWS, new UpdateRowsEventDataDeserializer(tableMapEventByTableId));
         eventDataDeserializers.put(EventType.WRITE_ROWS, new WriteRowsEventDataDeserializer(tableMapEventByTableId));
         eventDataDeserializers.put(EventType.DELETE_ROWS, new DeleteRowsEventDataDeserializer(tableMapEventByTableId));
-        eventDataDeserializers.put(EventType.EXT_WRITE_ROWS, (new WriteRowsEventDataDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
-        eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS, (new UpdateRowsEventDataDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
-        eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new DeleteRowsEventDataDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+        eventDataDeserializers.put(EventType.EXT_WRITE_ROWS, (new WriteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+        eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS, (new UpdateDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+        eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new DeleteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
         eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer());
 
         if (simpleEventModel) {

+ 60 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/DatetimeV2Deserialize.java

@@ -0,0 +1,60 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+public class DatetimeV2Deserialize {
+
+    public Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        long datetime = bigEndianLong(inputStream.read(5), 0, 5);
+        int yearMonth = bitSlice(datetime, 1, 17, 40);
+        int fsp = deserializeFractionalSeconds(meta, inputStream);
+        LocalDateTime time = LocalDateTime.of(
+                yearMonth / 13,
+                yearMonth % 13,
+                bitSlice(datetime, 18, 5, 40),
+                bitSlice(datetime, 23, 5, 40),
+                bitSlice(datetime, 28, 6, 40),
+                bitSlice(datetime, 34, 6, 40),
+                fsp / 1000
+        );
+        return Timestamp.valueOf(time);
+    }
+
+    private long bigEndianLong(byte[] bytes, int offset, int length) {
+        long result = 0;
+        for (int i = offset; i < (offset + length); i++) {
+            byte b = bytes[i];
+            result = (result << 8) | (b >= 0 ? (int) b : (b + 256));
+        }
+        return result;
+    }
+
+    private int deserializeFractionalSeconds(int meta, ByteArrayInputStream inputStream) throws IOException {
+        int length = (meta + 1) / 2;
+        if (length > 0) {
+            int fraction = bigEndianInteger(inputStream.read(length), 0, length);
+            return fraction * (int) Math.pow(100, 3 - length);
+        }
+        return 0;
+    }
+
+    private int bigEndianInteger(byte[] bytes, int offset, int length) {
+        int result = 0;
+        for (int i = offset; i < (offset + length); i++) {
+            byte b = bytes[i];
+            result = (result << 8) | (b >= 0 ? (int) b : (b + 256));
+        }
+        return result;
+    }
+
+    private int bitSlice(long value, int bitOffset, int numberOfBits, int payloadSize) {
+        long result = value >> payloadSize - (bitOffset + numberOfBits);
+        return (int) (result & ((1 << numberOfBits) - 1));
+    }
+
+}

+ 24 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/DeleteDeserializer.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.DeleteRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class DeleteDeserializer extends DeleteRowsEventDataDeserializer {
+
+    private DatetimeV2Deserialize datetimeV2Deserialize;
+
+    public DeleteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+        datetimeV2Deserialize = new DatetimeV2Deserialize();
+    }
+
+    protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
+    }
+
+}

+ 24 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/UpdateDeserializer.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class UpdateDeserializer extends UpdateRowsEventDataDeserializer {
+
+    private DatetimeV2Deserialize datetimeV2Deserialize;
+
+    public UpdateDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+        datetimeV2Deserialize = new DatetimeV2Deserialize();
+    }
+
+    protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
+    }
+
+}

+ 24 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/deserializer/WriteDeserializer.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.listener.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class WriteDeserializer extends WriteRowsEventDataDeserializer {
+
+    private DatetimeV2Deserialize datetimeV2Deserialize;
+
+    public WriteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+        datetimeV2Deserialize = new DatetimeV2Deserialize();
+    }
+
+    protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
+    }
+
+}

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 0 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -91,8 +91,6 @@ public interface Manager extends Executor {
 
     Config getConfig(String configId);
 
-    void removeConfig(String configId);
-
     List<Config> getConfigAll();
 
     // Data

+ 0 - 5
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -230,11 +230,6 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         return operationTemplate.queryObject(Config.class, configId);
     }
 
-    @Override
-    public void removeConfig(String configId) {
-        operationTemplate.remove(new OperationConfig(configId));
-    }
-
     @Override
     public List<Config> getConfigAll() {
         Config config = new Config();

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.2-Alpha</version>
+		<version>1.1.3-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -23,7 +23,7 @@ management.endpoints.web.exposure.include=*
 management.endpoint.health.show-details=always
 management.health.elasticsearch.enabled=false
 info.app.name=DBSyncer
-info.app.version=1.1.2-Alpha
+info.app.version=1.1.3-Beta
 info.app.copyright=&copy;2021 ${info.app.name}(${info.app.version})<footer>Designed By <a href='https://gitee.com/ghi/dbsyncer' target='_blank' >AE86</a></footer>
 
 #All < Trace < Debug < Info < Warn < Error < Fatal < OFF

+ 1 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-	<version>1.1.2-Alpha</version>
+	<version>1.1.3-Beta</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>

+ 1 - 1
version.cmd

@@ -1,6 +1,6 @@
 @echo off
 
-set APP_VERSION=1.1.2-Alpha
+set APP_VERSION=1.1.3-Beta
 
 echo "Clean Project ..."
 call mvn clean -f pom.xml