浏览代码

fix closed

AE86 3 年之前
父节点
当前提交
d3dca81fac

+ 1 - 3
README.md

@@ -1,6 +1,6 @@
 <div>
     <h3>介绍</h3>
-    <p>DBSyncer是一款开源的数据同步软件,提供Mysql、Oracle、SqlServer、Redis、SQL结果集等场景,支持自定义同步转换业务。</p>
+    <p>DBSyncer是一款开源的数据同步软件,提供Mysql、Oracle、SqlServer、SQL结果集等场景,支持自定义同步转换业务。</p>
     <p>特点</p>
     <ol>
         <li>组合驱动,自定义库同步到库组合,关系型数据库与非关系型之间组合,任意搭配表同步映射关系</li>
@@ -161,8 +161,6 @@
                 </td>
                 <td>
                     <p>要求2008版本以上, 启动代理服务(Agent服务), 连接账号具有 sysadmin 固定服务器角色或 db_owner 固定数据库角色的成员身份。对于所有其他用户,具有源表SELECT 权限;如果已定义捕获实例的访问控制角色,则还要求具有该数据库角色的成员身份。</p>
-                    <p>EXEC sys.sp_cdc_enable_db</p>
-                    <p>EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0</p>
                 </td>
                 <td>SQL Server 2008提供了内建的方法变更数据捕获(Change Data Capture 即CDC)以实现异步跟踪用户表的数据修改</td>
             </tr>

+ 3 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -38,12 +38,10 @@ public class ConnectorFactory implements DisposableBean {
         String type = config.getConnectorType();
         Connector connector = getConnector(type);
         String cacheKey = connector.getConnectorMapperCacheKey(config);
-        ConnectorMapper mapper = connectorCache.get(cacheKey);
-        if (null == mapper) {
-            mapper = connector.connect(config);
-            connectorCache.putIfAbsent(mapper.getCacheKey(), mapper);
+        if (!connectorCache.containsKey(cacheKey)) {
+            connectorCache.putIfAbsent(cacheKey, connector.connect(config));
         }
-        return mapper;
+        return connectorCache.get(cacheKey);
     }
 
     /**

+ 5 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -51,12 +51,12 @@ public abstract class AbstractDatabaseConnector implements Database {
 
     @Override
     public boolean isAlive(ConnectorMapper connectorMapper) {
-        try {
-            JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
-            return null != jdbcTemplate && !jdbcTemplate.getDataSource().getConnection().isClosed();
-        } catch (SQLException e) {
-            throw new ConnectorException(e.getMessage(), e.getCause());
+        JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
+        if(null != jdbcTemplate){
+            Integer count = jdbcTemplate.queryForObject("select 1", Integer.class);
+            return count > 0;
         }
+        return false;
     }
 
     @Override

+ 12 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -1,12 +1,24 @@
 package org.dbsyncer.connector.oracle;
 
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.springframework.jdbc.core.JdbcTemplate;
 
 public final class OracleConnector extends AbstractDatabaseConnector {
 
+    @Override
+    public boolean isAlive(ConnectorMapper connectorMapper) {
+        JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
+        if(null != jdbcTemplate){
+            Integer count = jdbcTemplate.queryForObject("select 1 from dual", Integer.class);
+            return count > 0;
+        }
+        return false;
+    }
+
     @Override
     protected String getTablesSql(DatabaseConfig config) {
         return String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER='%s'", config.getUsername()).toUpperCase();

+ 11 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -4,12 +4,23 @@ import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.util.List;
 import java.util.Map;
 
 public final class DQLOracleConnector extends AbstractDatabaseConnector {
 
+    @Override
+    public boolean isAlive(ConnectorMapper connectorMapper) {
+        JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
+        if(null != jdbcTemplate){
+            Integer count = jdbcTemplate.queryForObject("select 1 from dual", Integer.class);
+            return count > 0;
+        }
+        return false;
+    }
+
     @Override
     protected String getTablesSql(DatabaseConfig config) {
         return String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER='%s'", config.getUsername()).toUpperCase();

+ 0 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java

@@ -36,10 +36,6 @@ public abstract class DatabaseUtil {
         dataSource.setUrl(config.getUrl());
         dataSource.setUsername(config.getUsername());
         dataSource.setPassword(config.getPassword());
-        // 是否自动回收超时连接
-        dataSource.setRemoveAbandoned(true);
-        // 超时时间(以秒数为单位)
-        dataSource.setRemoveAbandonedTimeout(60);
         return new JdbcTemplate(dataSource);
     }
 

+ 48 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/TableOperationEnum.java

@@ -0,0 +1,48 @@
+package org.dbsyncer.listener.enums;
+
+public enum TableOperationEnum {
+
+    /**
+     * 插入
+     */
+    INSERT(2),
+    /**
+     * 更新(旧值)
+     */
+    UPDATE_BEFORE(3),
+    /**
+     * 更新(新值)
+     */
+    UPDATE_AFTER(4),
+    /**
+     * 删除
+     */
+    DELETE(1);
+
+    private final int code;
+
+    TableOperationEnum(int code) {
+        this.code = code;
+    }
+
+    public static boolean isInsert(int code) {
+        return INSERT.getCode() == code;
+    }
+
+    public static boolean isUpdateBefore(int code) {
+        return UPDATE_BEFORE.getCode() == code;
+    }
+
+    public static boolean isUpdateAfter(int code) {
+        return UPDATE_AFTER.getCode() == code;
+    }
+
+    public static boolean isDelete(int code) {
+        return DELETE.getCode() == code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+}

+ 0 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskServiceImpl.java

@@ -44,7 +44,6 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
     public void stop(String key) {
         ScheduledFuture job = map.get(key);
         if (null != job) {
-            logger.info(">>>>>> 进入停止任务 {}  >>>>>>", key);
             job.cancel(true);
             map.remove(key);
         }

+ 32 - 72
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -9,6 +9,7 @@ import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.enums.TableOperationEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.JdbcTemplate;
@@ -46,12 +47,13 @@ public class SqlServerExtractor extends AbstractExtractor {
     private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
 
     private static final String LSN_POSITION = "position";
-    private static final long DEFAULT_POLL_INTERVAL_MILLIS = 36000;
+    private static final long DEFAULT_POLL_INTERVAL_MILLIS = 3000;
     private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500;
     private static final int OFFSET_COLUMNS = 4;
     private final Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
+    private volatile boolean connectionClosed;
     private static Set<String> tables;
     private static Set<SqlServerChangeTable> changeTables;
     private Connection connection;
@@ -119,10 +121,13 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private void connect() throws SQLException {
         DatabaseConfig cfg = (DatabaseConfig) connectorConfig;
-        final ConnectorMapper connectorMapper = connectorFactory.connect(cfg);
-        JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
-        connection = jdbcTemplate.getDataSource().getConnection();
-        serverName = cfg.getUrl();
+        if(connectorFactory.isAlive(cfg)){
+            final ConnectorMapper connectorMapper = connectorFactory.connect(cfg);
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
+            connection = jdbcTemplate.getDataSource().getConnection();
+            serverName = cfg.getUrl();
+            connectionClosed = false;
+        }
     }
 
     private void readLastLsn() {
@@ -238,7 +243,7 @@ public class SqlServerExtractor extends AbstractExtractor {
                 while (rs.next()) {
                     // skip update before
                     final int operation = rs.getInt(3);
-                    if (TableOperation.isUpdateBefore(operation)) {
+                    if (TableOperationEnum.isUpdateBefore(operation)) {
                         continue;
                     }
                     row = new ArrayList<>(columnCount - OFFSET_COLUMNS);
@@ -259,17 +264,17 @@ public class SqlServerExtractor extends AbstractExtractor {
     private void parseEvent(List<CDCEvent> list) {
         for (CDCEvent event : list) {
             int code = event.getCode();
-            if (TableOperation.isUpdateAfter(code)) {
+            if (TableOperationEnum.isUpdateAfter(code)) {
                 asynSendRowChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getRow()));
                 continue;
             }
 
-            if (TableOperation.isInsert(code)) {
+            if (TableOperationEnum.isInsert(code)) {
                 asynSendRowChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, event.getRow()));
                 continue;
             }
 
-            if (TableOperation.isDelete(code)) {
+            if (TableOperationEnum.isDelete(code)) {
                 asynSendRowChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow(), Collections.EMPTY_LIST));
             }
         }
@@ -306,7 +311,16 @@ public class SqlServerExtractor extends AbstractExtractor {
         ResultSet rs = null;
         T apply = null;
         try {
-            final PreparedStatement ps = createPreparedStatement(sql);
+            if(connectionClosed){
+                connect();
+                return null;
+            }
+            PreparedStatement ps = createPreparedStatement(sql);
+            if(ps.getConnection().isClosed() || ps.isClosed()){
+                preparedStatementCache.clear();
+                connectionClosed = true;
+                return null;
+            }
             if (null != statementPreparer) {
                 statementPreparer.accept(ps);
             }
@@ -323,66 +337,13 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private PreparedStatement createPreparedStatement(String preparedQueryString) {
-        try {
-            if(connection.isClosed()){
-                connect();
+        return preparedStatementCache.computeIfAbsent(preparedQueryString, query -> {
+            try {
+                return connection.prepareStatement(query);
+            } catch (SQLException e) {
+                throw new ListenerException(e);
             }
-            return preparedStatementCache.computeIfAbsent(preparedQueryString, query -> {
-                try {
-                    return connection.prepareStatement(query);
-                } catch (SQLException e) {
-                    throw new ListenerException(e);
-                }
-            });
-        } catch (SQLException e) {
-            logger.error(e.getMessage());
-            throw new ListenerException(e.getCause());
-        }
-    }
-
-    enum TableOperation {
-        /**
-         * 插入
-         */
-        INSERT(2),
-        /**
-         * 更新(旧值)
-         */
-        UPDATE_BEFORE(3),
-        /**
-         * 更新(新值)
-         */
-        UPDATE_AFTER(4),
-        /**
-         * 删除
-         */
-        DELETE(1);
-
-        private final int code;
-
-        TableOperation(int code) {
-            this.code = code;
-        }
-
-        public static boolean isInsert(int code) {
-            return INSERT.getCode() == code;
-        }
-
-        public static boolean isUpdateBefore(int code) {
-            return UPDATE_BEFORE.getCode() == code;
-        }
-
-        public static boolean isUpdateAfter(int code) {
-            return UPDATE_AFTER.getCode() == code;
-        }
-
-        public static boolean isDelete(int code) {
-            return DELETE.getCode() == code;
-        }
-
-        public int getCode() {
-            return code;
-        }
+        });
     }
 
     final class Worker extends Thread {
@@ -392,11 +353,10 @@ public class SqlServerExtractor extends AbstractExtractor {
             while (!isInterrupted() && connected) {
                 try {
                     Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
-                    if (null == stopLsn || !stopLsn.isAvailable()) {
-                        TimeUnit.MICROSECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
+                    if (null == stopLsn) {
                         continue;
                     }
-                    if (stopLsn.compareTo(lastLsn) <= 0) {
+                    if (!stopLsn.isAvailable() || stopLsn.compareTo(lastLsn) <= 0) {
                         TimeUnit.MICROSECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
                         continue;
                     }

+ 0 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -26,8 +26,6 @@ import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
@@ -45,8 +43,6 @@ import java.util.Map;
 @Component
 public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent> {
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
     @Autowired
     private Parser parser;
 

+ 4 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -86,7 +86,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     @PostConstruct
     private void init() {
         key = UUIDUtil.getUUID();
-        scheduledTaskService.start(key, "*/10 * * * * ?", this);
+        String cron = "*/10 * * * * ?";
+        scheduledTaskService.start(key, cron, this);
+        logger.info("[{}], Started persistence task {}", cron, key);
     }
 
     @Override
@@ -139,6 +141,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     @Override
     public void destroy() {
         scheduledTaskService.stop(key);
+        logger.info("Stopped persistence task {}", key);
     }
 
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)