浏览代码

支持获取位点信息

穿云 3 月之前
父节点
当前提交
37cb88e990

+ 8 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/ConnectorService.java

@@ -78,4 +78,12 @@ public interface ConnectorService {
      * @return
      */
     boolean isAlive(String id);
+
+    /**
+     * 获取位点信息
+     *
+     * @param params
+     * @return
+     */
+    Object getPosition(String params);
 }

+ 8 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -16,6 +16,7 @@ import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.slf4j.Logger;
@@ -164,6 +165,13 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
         return health.containsKey(id) && health.get(id);
     }
 
+    @Override
+    public Object getPosition(String id) {
+        Connector connector = getConnector(id);
+        ConnectorInstance connectorInstance = connectorFactory.connect(connector.getConfig());
+        return connectorFactory.getPosition(connectorInstance);
+    }
+
     private boolean isAlive(ConnectorConfig config) {
         try {
             return connectorFactory.isAlive(config);

+ 5 - 0
dbsyncer-connector/dbsyncer-connector-base/src/main/java/org/dbsyncer/connector/base/ConnectorFactory.java

@@ -145,6 +145,11 @@ public class ConnectorFactory implements DisposableBean {
         return getConnectorService(connectorInstance.getConfig()).getMetaInfo(connectorInstance, tableName);
     }
 
+    public Object getPosition(ConnectorInstance connectorInstance) {
+        Assert.notNull(connectorInstance, "ConnectorInstance can not be null.");
+        return getConnectorService(connectorInstance.getConfig()).getPosition(connectorInstance);
+    }
+
     /**
      * 获取连接器同步参数
      *

+ 18 - 0
dbsyncer-connector/dbsyncer-connector-sqlserver/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -5,6 +5,7 @@ package org.dbsyncer.connector.sqlserver;
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.sqlserver.cdc.Lsn;
 import org.dbsyncer.connector.sqlserver.cdc.SqlServerListener;
 import org.dbsyncer.connector.sqlserver.validator.SqlServerConfigValidator;
 import org.dbsyncer.sdk.config.CommandConfig;
@@ -147,6 +148,23 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
         return targetCommand;
     }
 
+    @Override
+    public Object getPosition(DatabaseConnectorInstance connectorInstance) {
+        String sql = "SELECT * from cdc.lsn_time_mapping order by tran_begin_time desc";
+        List<Map<String, Object>> result = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForList(sql));
+        if (!CollectionUtils.isEmpty(result)) {
+            List<Object> list = new ArrayList<>();
+            result.forEach(r -> {
+                r.computeIfPresent("start_lsn", (k, lsn)-> new Lsn((byte[]) lsn).toString());
+                r.computeIfPresent("tran_begin_lsn", (k, lsn)-> new Lsn((byte[]) lsn).toString());
+                r.computeIfPresent("tran_id", (k, lsn)-> new Lsn((byte[]) lsn).toString());
+                list.add(r);
+            });
+            return list;
+        }
+        return result;
+    }
+
     private String convertKey(String key) {
         return new StringBuilder("[").append(key).append("]").toString();
     }

+ 10 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorService.java

@@ -4,6 +4,7 @@
 package org.dbsyncer.sdk.spi;
 
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.config.CommandConfig;
 import org.dbsyncer.sdk.config.DDLConfig;
@@ -180,4 +181,13 @@ public interface ConnectorService<I extends ConnectorInstance, C extends Connect
         return null;
     }
 
+    /**
+     * 获取指定时间的位点信息
+     *
+     * @param connectorInstance
+     * @return
+     */
+    default Object getPosition(I connectorInstance) {
+        return StringUtil.EMPTY;
+    }
 }

+ 1 - 1
dbsyncer-web/src/main/java/org/dbsyncer/web/Version.java

@@ -16,7 +16,7 @@ public class Version {
     private final long version;
 
     public static final Version V_2_0_5 = new Version(20_00_05_2025_00_00_00L);
-    public static final Version CURRENT = new Version(20_00_05_2025_02_18_01L);
+    public static final Version CURRENT = new Version(20_00_05_2025_02_19_00L);
 
     public Version(long version) {
         this.version = version;

+ 11 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/ConnectorController.java

@@ -90,4 +90,15 @@ public class ConnectorController extends BaseController {
         }
     }
 
+    @GetMapping(value = "/getPosition")
+    @ResponseBody
+    public RestResult getPosition(@RequestParam(value = "id") String id) {
+        try {
+            return RestResult.restSuccess(connectorService.getPosition(id));
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
 }