Răsfoiți Sursa

!195 修复字段修改后重名顺序不对问题
Merge pull request !195 from life/life_dev_datasource

AE86 1 an în urmă
părinte
comite
416e8a2b1b

+ 28 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -236,6 +236,7 @@ public final class IncrementPuller extends AbstractPuller implements Application
         protected void execute(String tableGroupId, ChangedEvent event) {
             bufferActuatorRouter.execute(meta.getId(), tableGroupId, event);
         }
+
     }
 
     final class QuartzConsumer extends AbstractConsumer<ScanChangedEvent> {
@@ -262,23 +263,26 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
     final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
         private Mapping mapping;
+        private List<TableGroup> tableGroups;
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
+        //判断上次是否为ddl,是ddl需要强制刷新下picker
+        private boolean ddlChanged;
+
         public LogConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
+            this.tableGroups = tableGroups;
             this.meta = meta;
             this.mapping = mapping;
-            tableGroups.forEach(t -> {
-                final Table table = t.getSourceTable();
-                final String tableName = table.getName();
-                tablePicker.putIfAbsent(tableName, new ArrayList<>());
-                TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
-                tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
-                bind(group.getId());
-            });
+            addTablePicker(true);
         }
 
         @Override
         public void onChange(RowChangedEvent event) {
+            // 需要强制刷新 fix https://gitee.com/ghi/dbsyncer/issues/I8DJUR
+            if (ddlChanged) {
+                addTablePicker(false);
+                ddlChanged = false;
+            }
             process(event, picker -> {
                 final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
                 if (picker.filter(changedRow)) {
@@ -290,6 +294,7 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
         @Override
         public void onDDLChanged(DDLChangedEvent event) {
+            ddlChanged = true;
             process(event, picker -> execute(picker.getTableGroup().getId(), event));
         }
 
@@ -303,6 +308,21 @@ public final class IncrementPuller extends AbstractPuller implements Application
             }
         }
 
+        private void addTablePicker(boolean bindBufferActuatorRouter) {
+            this.tablePicker.clear();
+            this.tableGroups.forEach(t -> {
+                final Table table = t.getSourceTable();
+                final String tableName = table.getName();
+                tablePicker.putIfAbsent(tableName, new ArrayList<>());
+                TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
+                tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
+                // 是否注册到路由服务中
+                if (bindBufferActuatorRouter) {
+                    bind(group.getId());
+                }
+            });
+        }
+
     }
 
 }