|
@@ -198,6 +198,9 @@ public final class IncrementPuller extends AbstractPuller implements Application
|
|
|
abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
|
|
|
protected Meta meta;
|
|
|
|
|
|
+ //判断上次是否为ddl,是ddl需要强制刷新下picker
|
|
|
+ private boolean ddl;
|
|
|
+
|
|
|
public abstract void onChange(E e);
|
|
|
|
|
|
public void onDDLChanged(DDLChangedEvent event) {
|
|
@@ -236,6 +239,14 @@ public final class IncrementPuller extends AbstractPuller implements Application
|
|
|
protected void execute(String tableGroupId, ChangedEvent event) {
|
|
|
bufferActuatorRouter.execute(meta.getId(), tableGroupId, event);
|
|
|
}
|
|
|
+
|
|
|
+ public boolean isDdl() {
|
|
|
+ return ddl;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setDdl(boolean ddl) {
|
|
|
+ this.ddl = ddl;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
final class QuartzConsumer extends AbstractConsumer<ScanChangedEvent> {
|
|
@@ -262,9 +273,12 @@ public final class IncrementPuller extends AbstractPuller implements Application
|
|
|
|
|
|
final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
|
|
|
private Mapping mapping;
|
|
|
+
|
|
|
+ private List<TableGroup> tableGroups;
|
|
|
private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
|
|
|
|
|
|
public LogConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
|
|
|
+ this.tableGroups = tableGroups;
|
|
|
this.meta = meta;
|
|
|
this.mapping = mapping;
|
|
|
tableGroups.forEach(t -> {
|
|
@@ -279,6 +293,18 @@ public final class IncrementPuller extends AbstractPuller implements Application
|
|
|
|
|
|
@Override
|
|
|
public void onChange(RowChangedEvent event) {
|
|
|
+ if (isDdl()){//需要强制刷新 fix https://gitee.com/ghi/dbsyncer/issues/I8DJUR
|
|
|
+ this.tablePicker.clear();
|
|
|
+ this.tableGroups.forEach(t -> {
|
|
|
+ final Table table = t.getSourceTable();
|
|
|
+ final String tableName = table.getName();
|
|
|
+ tablePicker.putIfAbsent(tableName, new ArrayList<>());
|
|
|
+ TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
|
|
|
+ tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
|
|
|
+ bind(group.getId());
|
|
|
+ });
|
|
|
+ setDdl(false);
|
|
|
+ }
|
|
|
process(event, picker -> {
|
|
|
final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
|
|
|
if (picker.filter(changedRow)) {
|
|
@@ -290,6 +316,7 @@ public final class IncrementPuller extends AbstractPuller implements Application
|
|
|
|
|
|
@Override
|
|
|
public void onDDLChanged(DDLChangedEvent event) {
|
|
|
+ setDdl(true);
|
|
|
process(event, picker -> execute(picker.getTableGroup().getId(), event));
|
|
|
}
|
|
|
|