|
@@ -150,8 +150,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
|
|
|
if (ListenerTypeEnum.isTiming(listenerType)) {
|
|
|
QuartzExtractor extractor = listener.getExtractor(listenerType, QuartzExtractor.class);
|
|
|
List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
|
|
|
+ PrimaryKeyMappingStrategy strategy = PrimaryKeyMappingEnum.DEFAULT.getPrimaryKeyMappingStrategy();
|
|
|
|
|
|
- setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
|
|
|
+ setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list, strategy));
|
|
|
extractor.setConnectorFactory(connectorFactory);
|
|
|
extractor.setScheduledTaskService(scheduledTaskService);
|
|
|
extractor.setCommands(commands);
|
|
@@ -179,9 +180,10 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
|
|
|
}
|
|
|
|
|
|
abstract class AbstractListener implements Event {
|
|
|
- protected Mapping mapping;
|
|
|
- protected String metaId;
|
|
|
- protected AtomicBoolean changed = new AtomicBoolean();
|
|
|
+ protected Mapping mapping;
|
|
|
+ protected String metaId;
|
|
|
+ protected AtomicBoolean changed = new AtomicBoolean();
|
|
|
+ protected PrimaryKeyMappingStrategy strategy;
|
|
|
|
|
|
@Override
|
|
|
public void flushEvent(Map<String, String> map) {
|
|
@@ -228,8 +230,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
|
|
|
|
|
|
private List<FieldPicker> tablePicker;
|
|
|
|
|
|
- public QuartzListener(Mapping mapping, List<TableGroup> list) {
|
|
|
+ public QuartzListener(Mapping mapping, List<TableGroup> list, PrimaryKeyMappingStrategy strategy) {
|
|
|
this.mapping = mapping;
|
|
|
+ this.strategy = strategy;
|
|
|
this.metaId = mapping.getMetaId();
|
|
|
this.tablePicker = new LinkedList<>();
|
|
|
list.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
|
|
@@ -244,7 +247,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
|
|
|
rowChangedEvent.getAfter());
|
|
|
|
|
|
// 处理过程有异常向上抛
|
|
|
- parser.execute(mapping, picker.getTableGroup(), rowChangedEvent, null);
|
|
|
+ parser.execute(mapping, picker.getTableGroup(), rowChangedEvent, strategy);
|
|
|
|
|
|
// 标记有变更记录
|
|
|
changed.compareAndSet(false, true);
|
|
@@ -275,8 +278,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
|
|
|
|
|
|
private Map<String, List<FieldPicker>> tablePicker;
|
|
|
|
|
|
- private PrimaryKeyMappingStrategy strategy;
|
|
|
-
|
|
|
public LogListener(Mapping mapping, List<TableGroup> list, PrimaryKeyMappingStrategy strategy) {
|
|
|
this.mapping = mapping;
|
|
|
this.metaId = mapping.getMetaId();
|