|
@@ -12,6 +12,7 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
|
import org.dbsyncer.listener.oracle.event.DCNEvent;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.util.Assert;
|
|
|
|
|
|
import java.lang.reflect.InvocationTargetException;
|
|
|
import java.lang.reflect.Method;
|
|
@@ -35,23 +36,24 @@ public class DBChangeNotification {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
- private static final String QUERY_ROW_DATA_SQL = "SELECT * FROM \"%s\" WHERE ROWID = '%s'";
|
|
|
- private static final String QUERY_TABLE_ALL_SQL = "SELECT OBJECT_ID, OBJECT_NAME FROM DBA_OBJECTS WHERE OWNER='%S' AND OBJECT_TYPE = 'TABLE'";
|
|
|
- private static final String QUERY_TABLE_SQL = "SELECT 1 FROM \"%s\" WHERE 1=2";
|
|
|
- private static final String QUERY_CALLBACK_SQL = "SELECT REGID,CALLBACK FROM USER_CHANGE_NOTIFICATION_REGS";
|
|
|
- private static final String CALLBACK = "net8://(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))?PR=0";
|
|
|
-
|
|
|
- private String username;
|
|
|
- private String password;
|
|
|
- private String url;
|
|
|
- private OracleConnection conn;
|
|
|
- private OracleStatement statement;
|
|
|
+ private static final String QUERY_ROW_DATA_SQL = "SELECT * FROM \"%s\" WHERE ROWID='%s'";
|
|
|
+ private static final String QUERY_TABLE_ALL_SQL = "SELECT TABLE_NAME FROM USER_TAB_COMMENTS WHERE TABLE_TYPE='TABLE'";
|
|
|
+ private static final String QUERY_TABLE_ID_SQL = "SELECT OBJECT_ID FROM DBA_OBJECTS WHERE OBJECT_TYPE='TABLE' AND OBJECT_NAME='%s'";
|
|
|
+ private static final String QUERY_TABLE_SQL = "SELECT 1 FROM \"%s\" WHERE 1=2";
|
|
|
+ private static final String QUERY_CALLBACK_SQL = "SELECT REGID,CALLBACK FROM USER_CHANGE_NOTIFICATION_REGS";
|
|
|
+ private static final String CALLBACK = "net8://(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))?PR=0";
|
|
|
+
|
|
|
+ private String username;
|
|
|
+ private String password;
|
|
|
+ private String url;
|
|
|
+ private OracleConnection conn;
|
|
|
+ private OracleStatement statement;
|
|
|
private DatabaseChangeRegistration dcr;
|
|
|
- private Map<Integer, String> tables;
|
|
|
- private Worker worker;
|
|
|
- private Set<String> filterTable;
|
|
|
- private List<RowEventListener> listeners = new ArrayList<>();
|
|
|
- private BlockingQueue<DCNEvent> queue = new LinkedBlockingQueue<> (100);
|
|
|
+ private Map<Integer, String> tables;
|
|
|
+ private Worker worker;
|
|
|
+ private Set<String> filterTable;
|
|
|
+ private List<RowEventListener> listeners = new ArrayList<>();
|
|
|
+ private BlockingQueue<DCNEvent> queue = new LinkedBlockingQueue<>(100);
|
|
|
|
|
|
public DBChangeNotification(String username, String password, String url) {
|
|
|
this.username = username;
|
|
@@ -111,8 +113,16 @@ public class DBChangeNotification {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public OracleConnection getOracleConnection() {
|
|
|
+ return conn;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setFilterTable(Set<String> filterTable) {
|
|
|
+ this.filterTable = filterTable;
|
|
|
+ }
|
|
|
+
|
|
|
public void close() {
|
|
|
- if(null != worker && !worker.isInterrupted()){
|
|
|
+ if (null != worker && !worker.isInterrupted()) {
|
|
|
worker.interrupt();
|
|
|
worker = null;
|
|
|
}
|
|
@@ -164,18 +174,42 @@ public class DBChangeNotification {
|
|
|
|
|
|
private void readTables() {
|
|
|
tables = new LinkedHashMap<>();
|
|
|
+ List<String> tableList = queryForList(QUERY_TABLE_ALL_SQL, rs -> rs.getString(1));
|
|
|
+ Assert.notEmpty(tableList, "No tables available");
|
|
|
+ tableList.forEach(tableName -> tables.put(queryForObject(String.format(QUERY_TABLE_ID_SQL, tableName), rs -> rs.getInt(1)), tableName));
|
|
|
+ }
|
|
|
+
|
|
|
+ private <T> List<T> queryForList(String sql, ResultSetMapper<T> mapper) {
|
|
|
+ ResultSet rs = null;
|
|
|
+ List<T> list = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ rs = statement.executeQuery(sql);
|
|
|
+ while (rs.next()) {
|
|
|
+ list.add(mapper.apply(rs));
|
|
|
+ }
|
|
|
+ } catch (SQLException e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ } finally {
|
|
|
+ close(rs);
|
|
|
+ }
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
+ private <T> T queryForObject(String sql, ResultSetMapper<T> mapper) {
|
|
|
ResultSet rs = null;
|
|
|
+ T apply = null;
|
|
|
try {
|
|
|
- String sql = String.format(QUERY_TABLE_ALL_SQL, username);
|
|
|
rs = statement.executeQuery(sql);
|
|
|
while (rs.next()) {
|
|
|
- tables.put(rs.getInt(1), rs.getString(2));
|
|
|
+ apply = mapper.apply(rs);
|
|
|
+ break;
|
|
|
}
|
|
|
} catch (SQLException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
} finally {
|
|
|
close(rs);
|
|
|
}
|
|
|
+ return apply;
|
|
|
}
|
|
|
|
|
|
private String getHost() {
|
|
@@ -194,7 +228,7 @@ public class DBChangeNotification {
|
|
|
Class clazz = dcr.getClass().getSuperclass();
|
|
|
Method method = clazz.getDeclaredMethod("getClientTCPPort");
|
|
|
method.setAccessible(true);
|
|
|
- obj = method.invoke(dcr, new Object[] {});
|
|
|
+ obj = method.invoke(dcr, new Object[]{});
|
|
|
} catch (NoSuchMethodException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
} catch (IllegalAccessException e) {
|
|
@@ -233,12 +267,8 @@ public class DBChangeNotification {
|
|
|
return (OracleConnection) dr.connect(url, prop);
|
|
|
}
|
|
|
|
|
|
- public OracleConnection getOracleConnection() {
|
|
|
- return conn;
|
|
|
- }
|
|
|
-
|
|
|
- public void setFilterTable(Set<String> filterTable) {
|
|
|
- this.filterTable = filterTable;
|
|
|
+ private interface ResultSetMapper<T> {
|
|
|
+ T apply(ResultSet rs) throws SQLException;
|
|
|
}
|
|
|
|
|
|
final class DCNListener implements DatabaseChangeListener {
|
|
@@ -249,7 +279,7 @@ public class DBChangeNotification {
|
|
|
RowChangeDescription[] rds = td.getRowChangeDescription();
|
|
|
for (RowChangeDescription rd : rds) {
|
|
|
String tableName = tables.get(td.getObjectNumber());
|
|
|
- if(!filterTable.contains(tableName)){
|
|
|
+ if (!filterTable.contains(tableName)) {
|
|
|
logger.info("Table[{}] {}", tableName, rd.getRowOperation().name());
|
|
|
continue;
|
|
|
}
|
|
@@ -275,7 +305,7 @@ public class DBChangeNotification {
|
|
|
try {
|
|
|
// 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
|
|
|
DCNEvent event = queue.take();
|
|
|
- if(null != event){
|
|
|
+ if (null != event) {
|
|
|
parseEvent(event);
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|