|
@@ -32,7 +32,7 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
|
|
|
* 自定义SQL,支持1对多
|
|
|
* <p>MY_USER > [用户表1, 用户表2]
|
|
|
*/
|
|
|
- private Map<String, List<DqlMapper>> dqlMap = new ConcurrentHashMap<>();
|
|
|
+ private final Map<String, List<DqlMapper>> dqlMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
/**
|
|
|
* 发送增量事件
|
|
@@ -57,7 +57,6 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- RowChangedEvent changedEvent = (RowChangedEvent) event;
|
|
|
boolean processed = false;
|
|
|
for (DqlMapper dqlMapper : dqlMappers) {
|
|
|
if (!processed) {
|
|
@@ -65,7 +64,7 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
|
|
|
case ConnectorConstant.OPERTION_UPDATE:
|
|
|
case ConnectorConstant.OPERTION_INSERT:
|
|
|
try {
|
|
|
- queryDqlData(dqlMapper, changedEvent.getChangedRow());
|
|
|
+ queryDqlData(dqlMapper, event.getChangedRow());
|
|
|
} catch (Exception e) {
|
|
|
return;
|
|
|
}
|
|
@@ -75,8 +74,8 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
|
|
|
}
|
|
|
processed = true;
|
|
|
}
|
|
|
- changedEvent.setSourceTableName(dqlMapper.sqlName);
|
|
|
- changeEvent(changedEvent);
|
|
|
+ event.setSourceTableName(dqlMapper.sqlName);
|
|
|
+ changeEvent(event);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -126,10 +125,6 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
|
|
|
|
|
|
/**
|
|
|
* 获取主表主键索引
|
|
|
- *
|
|
|
- * @param column
|
|
|
- * @param primaryKeys
|
|
|
- * @return
|
|
|
*/
|
|
|
protected Integer[] getPrimaryKeyIndexArray(List<Field> column, List<String> primaryKeys) {
|
|
|
List<Integer> indexList = new ArrayList<>();
|
|
@@ -162,7 +157,7 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- final class DqlMapper {
|
|
|
+ static final class DqlMapper {
|
|
|
DatabaseConnectorInstance instance;
|
|
|
String sqlName;
|
|
|
String sql;
|