Răsfoiți Sursa

!46 merge
Merge pull request !46 from AE86/V_1.0.0_Beta

AE86 3 ani în urmă
părinte
comite
b2eac071ab

+ 7 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/IntegerSetter.java

@@ -3,6 +3,7 @@ package org.dbsyncer.connector.database.setter;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.AbstractSetter;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -29,6 +30,12 @@ public class IntegerSetter extends AbstractSetter<Integer> {
             return;
         }
 
+        if (val instanceof BigDecimal) {
+            BigDecimal bigDecimal = (BigDecimal) val;
+            ps.setInt(i, bigDecimal.intValue());
+            return;
+        }
+
         throw new ConnectorException(String.format("IntegerSetter can not find type [%s], val [%s]", type, val));
     }
 }

+ 58 - 28
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -12,6 +12,7 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.oracle.event.DCNEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -35,23 +36,24 @@ public class DBChangeNotification {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final String QUERY_ROW_DATA_SQL  = "SELECT * FROM \"%s\" WHERE ROWID = '%s'";
-    private static final String QUERY_TABLE_ALL_SQL = "SELECT OBJECT_ID, OBJECT_NAME FROM DBA_OBJECTS WHERE OWNER='%S' AND OBJECT_TYPE = 'TABLE'";
-    private static final String QUERY_TABLE_SQL     = "SELECT 1 FROM \"%s\" WHERE 1=2";
-    private static final String QUERY_CALLBACK_SQL  = "SELECT REGID,CALLBACK FROM USER_CHANGE_NOTIFICATION_REGS";
-    private static final String CALLBACK            = "net8://(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))?PR=0";
-
-    private String                     username;
-    private String                     password;
-    private String                     url;
-    private OracleConnection           conn;
-    private OracleStatement            statement;
+    private static final String QUERY_ROW_DATA_SQL = "SELECT * FROM \"%s\" WHERE ROWID='%s'";
+    private static final String QUERY_TABLE_ALL_SQL = "SELECT TABLE_NAME FROM USER_TAB_COMMENTS WHERE TABLE_TYPE='TABLE'";
+    private static final String QUERY_TABLE_ID_SQL = "SELECT OBJECT_ID FROM DBA_OBJECTS WHERE OBJECT_TYPE='TABLE' AND OBJECT_NAME='%s'";
+    private static final String QUERY_TABLE_SQL = "SELECT 1 FROM \"%s\" WHERE 1=2";
+    private static final String QUERY_CALLBACK_SQL = "SELECT REGID,CALLBACK FROM USER_CHANGE_NOTIFICATION_REGS";
+    private static final String CALLBACK = "net8://(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))?PR=0";
+
+    private String username;
+    private String password;
+    private String url;
+    private OracleConnection conn;
+    private OracleStatement statement;
     private DatabaseChangeRegistration dcr;
-    private Map<Integer, String>       tables;
-    private Worker                     worker;
-    private Set<String>                filterTable;
-    private List<RowEventListener>     listeners = new ArrayList<>();
-    private BlockingQueue<DCNEvent>    queue = new LinkedBlockingQueue<> (100);
+    private Map<Integer, String> tables;
+    private Worker worker;
+    private Set<String> filterTable;
+    private List<RowEventListener> listeners = new ArrayList<>();
+    private BlockingQueue<DCNEvent> queue = new LinkedBlockingQueue<>(100);
 
     public DBChangeNotification(String username, String password, String url) {
         this.username = username;
@@ -111,8 +113,16 @@ public class DBChangeNotification {
         }
     }
 
+    public OracleConnection getOracleConnection() {
+        return conn;
+    }
+
+    public void setFilterTable(Set<String> filterTable) {
+        this.filterTable = filterTable;
+    }
+
     public void close() {
-        if(null != worker && !worker.isInterrupted()){
+        if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
         }
@@ -164,18 +174,42 @@ public class DBChangeNotification {
 
     private void readTables() {
         tables = new LinkedHashMap<>();
+        List<String> tableList = queryForList(QUERY_TABLE_ALL_SQL, rs -> rs.getString(1));
+        Assert.notEmpty(tableList, "No tables available");
+        tableList.forEach(tableName -> tables.put(queryForObject(String.format(QUERY_TABLE_ID_SQL, tableName), rs -> rs.getInt(1)), tableName));
+    }
+
+    private <T> List<T> queryForList(String sql, ResultSetMapper<T> mapper) {
+        ResultSet rs = null;
+        List<T> list = new ArrayList<>();
+        try {
+            rs = statement.executeQuery(sql);
+            while (rs.next()) {
+                list.add(mapper.apply(rs));
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        } finally {
+            close(rs);
+        }
+        return list;
+    }
+
+    private <T> T queryForObject(String sql, ResultSetMapper<T> mapper) {
         ResultSet rs = null;
+        T apply = null;
         try {
-            String sql = String.format(QUERY_TABLE_ALL_SQL, username);
             rs = statement.executeQuery(sql);
             while (rs.next()) {
-                tables.put(rs.getInt(1), rs.getString(2));
+                apply = mapper.apply(rs);
+                break;
             }
         } catch (SQLException e) {
             logger.error(e.getMessage());
         } finally {
             close(rs);
         }
+        return apply;
     }
 
     private String getHost() {
@@ -194,7 +228,7 @@ public class DBChangeNotification {
             Class clazz = dcr.getClass().getSuperclass();
             Method method = clazz.getDeclaredMethod("getClientTCPPort");
             method.setAccessible(true);
-            obj = method.invoke(dcr, new Object[] {});
+            obj = method.invoke(dcr, new Object[]{});
         } catch (NoSuchMethodException e) {
             logger.error(e.getMessage());
         } catch (IllegalAccessException e) {
@@ -233,12 +267,8 @@ public class DBChangeNotification {
         return (OracleConnection) dr.connect(url, prop);
     }
 
-    public OracleConnection getOracleConnection() {
-        return conn;
-    }
-
-    public void setFilterTable(Set<String> filterTable) {
-        this.filterTable = filterTable;
+    private interface ResultSetMapper<T> {
+        T apply(ResultSet rs) throws SQLException;
     }
 
     final class DCNListener implements DatabaseChangeListener {
@@ -249,7 +279,7 @@ public class DBChangeNotification {
                 RowChangeDescription[] rds = td.getRowChangeDescription();
                 for (RowChangeDescription rd : rds) {
                     String tableName = tables.get(td.getObjectNumber());
-                    if(!filterTable.contains(tableName)){
+                    if (!filterTable.contains(tableName)) {
                         logger.info("Table[{}] {}", tableName, rd.getRowOperation().name());
                         continue;
                     }
@@ -275,7 +305,7 @@ public class DBChangeNotification {
                 try {
                     // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
                     DCNEvent event = queue.take();
-                    if(null != event){
+                    if (null != event) {
                         parseEvent(event);
                     }
                 } catch (InterruptedException e) {

+ 16 - 14
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -14,7 +14,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.sql.*;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -47,18 +49,18 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private static final String LSN_POSITION = "position";
     private static final long DEFAULT_POLL_INTERVAL_MILLIS = 300;
-    private static final int                            PREPARED_STATEMENT_CACHE_CAPACITY = 500;
-    private static final int                            OFFSET_COLUMNS = 4;
-    private final        Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
-    private final        Lock                           connectLock = new ReentrantLock();
-    private volatile     boolean                        connected;
-    private volatile     boolean                        connectionClosed;
-    private static       Set<String>                    tables;
-    private static       Set<SqlServerChangeTable>      changeTables;
-    private              DatabaseConnectorMapper        connectorMapper;
-    private              Worker                         worker;
-    private              Lsn                            lastLsn;
-    private              String                         serverName;
+    private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500;
+    private static final int OFFSET_COLUMNS = 4;
+    private final Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
+    private volatile boolean connectionClosed;
+    private static Set<String> tables;
+    private static Set<SqlServerChangeTable> changeTables;
+    private DatabaseConnectorMapper connectorMapper;
+    private Worker worker;
+    private Lsn lastLsn;
+    private String serverName;
 
     @Override
     public void start() {
@@ -71,7 +73,7 @@ public class SqlServerExtractor extends AbstractExtractor {
             connected = true;
             connect();
             readTables();
-            Assert.isTrue(!CollectionUtils.isEmpty(tables), "No tables available");
+            Assert.notEmpty(tables, "No tables available");
 
             boolean enabledServerAgent = queryAndMap(IS_SERVER_AGENT_RUNNING, rs -> "Running.".equals(rs.getString(1)));
             Assert.isTrue(enabledServerAgent, "Please ensure that the SQL Server Agent is running");