|
@@ -41,7 +41,8 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
|
|
|
private final int READ_NUM = 1000;
|
|
|
private List<TableGroupQuartzCommand> commands;
|
|
|
private String eventFieldName;
|
|
|
- private boolean forceUpdate;
|
|
|
+ private boolean customEvent;
|
|
|
+ private String event;
|
|
|
private Set<String> update;
|
|
|
private Set<String> insert;
|
|
|
private Set<String> delete;
|
|
@@ -61,7 +62,15 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
|
|
|
@Override
|
|
|
public void start() {
|
|
|
eventFieldName = listenerConfig.getEventFieldName();
|
|
|
- forceUpdate = StringUtil.isBlank(listenerConfig.getEventFieldName());
|
|
|
+ if (StringUtil.isBlank(eventFieldName)) {
|
|
|
+ if (listenerConfig.isEnableUpdate()) {
|
|
|
+ event = ConnectorConstant.OPERTION_UPDATE;
|
|
|
+ customEvent = true;
|
|
|
+ } else if (listenerConfig.isEnableInsert()) {
|
|
|
+ event = ConnectorConstant.OPERTION_INSERT;
|
|
|
+ customEvent = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
update = Stream.of(listenerConfig.getUpdate().split(",")).collect(Collectors.toSet());
|
|
|
insert = Stream.of(listenerConfig.getInsert().split(",")).collect(Collectors.toSet());
|
|
|
delete = Stream.of(listenerConfig.getDelete().split(",")).collect(Collectors.toSet());
|
|
@@ -127,8 +136,8 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
|
|
|
}
|
|
|
|
|
|
for (Map<String, Object> row : data) {
|
|
|
- if (forceUpdate) {
|
|
|
- trySendEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
|
|
|
+ if (customEvent) {
|
|
|
+ trySendEvent(new ScanChangedEvent(index, event, row));
|
|
|
continue;
|
|
|
}
|
|
|
|