AE86 1 سال پیش
والد
کامیت
93d834c7cc
1فایلهای تغییر یافته به همراه25 افزوده شده و 32 حذف شده
  1. 25 32
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

+ 25 - 32
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -198,9 +198,6 @@ public final class IncrementPuller extends AbstractPuller implements Application
     abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
         protected Meta meta;
 
-        //判断上次是否为ddl,是ddl需要强制刷新下picker
-        private boolean ddl;
-
         public abstract void onChange(E e);
 
         public void onDDLChanged(DDLChangedEvent event) {
@@ -240,13 +237,6 @@ public final class IncrementPuller extends AbstractPuller implements Application
             bufferActuatorRouter.execute(meta.getId(), tableGroupId, event);
         }
 
-        public boolean isDdl() {
-            return ddl;
-        }
-
-        public void setDdl(boolean ddl) {
-            this.ddl = ddl;
-        }
     }
 
     final class QuartzConsumer extends AbstractConsumer<ScanChangedEvent> {
@@ -273,37 +263,25 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
     final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
         private Mapping mapping;
-
-        private  List<TableGroup> tableGroups;
+        private List<TableGroup> tableGroups;
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
+        //判断上次是否为ddl,是ddl需要强制刷新下picker
+        private boolean ddlChanged;
+
         public LogConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
             this.tableGroups = tableGroups;
             this.meta = meta;
             this.mapping = mapping;
-            tableGroups.forEach(t -> {
-                final Table table = t.getSourceTable();
-                final String tableName = table.getName();
-                tablePicker.putIfAbsent(tableName, new ArrayList<>());
-                TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
-                tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
-                bind(group.getId());
-            });
+            addTablePicker(true);
         }
 
         @Override
         public void onChange(RowChangedEvent event) {
-            if (isDdl()){//需要强制刷新 fix https://gitee.com/ghi/dbsyncer/issues/I8DJUR
-                this.tablePicker.clear();
-                this.tableGroups.forEach(t -> {
-                    final Table table = t.getSourceTable();
-                    final String tableName = table.getName();
-                    tablePicker.putIfAbsent(tableName, new ArrayList<>());
-                    TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
-                    tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
-                    bind(group.getId());
-                });
-                setDdl(false);
+            // 需要强制刷新 fix https://gitee.com/ghi/dbsyncer/issues/I8DJUR
+            if (ddlChanged) {
+                addTablePicker(false);
+                ddlChanged = false;
             }
             process(event, picker -> {
                 final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
@@ -316,7 +294,7 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
         @Override
         public void onDDLChanged(DDLChangedEvent event) {
-            setDdl(true);
+            ddlChanged = true;
             process(event, picker -> execute(picker.getTableGroup().getId(), event));
         }
 
@@ -330,6 +308,21 @@ public final class IncrementPuller extends AbstractPuller implements Application
             }
         }
 
+        private void addTablePicker(boolean bindBufferActuatorRouter) {
+            this.tablePicker.clear();
+            this.tableGroups.forEach(t -> {
+                final Table table = t.getSourceTable();
+                final String tableName = table.getName();
+                tablePicker.putIfAbsent(tableName, new ArrayList<>());
+                TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
+                tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
+                // 是否注册到路由服务中
+                if (bindBufferActuatorRouter) {
+                    bind(group.getId());
+                }
+            });
+        }
+
     }
 
 }