AE86 пре 5 година
родитељ
комит
c40d40f6d2

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -23,12 +23,12 @@ public interface Event {
     /**
      * 定时数据变更事件
      *
-     * @param tableName
+     * @param tableGroupIndex
      * @param event
      * @param before
      * @param after
      */
-    void changedQuartzEvent(String tableName, String event, Map<String, Object> before, Map<String, Object> after);
+    void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after);
 
     /**
      * 写入增量点事件

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -43,9 +43,9 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
-    public void changedQuartzEvent(String tableName, String event, Map<String, Object> before, Map<String, Object> after) {
+    public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedQuartzEvent(tableName, event, before, after));
+            watcher.forEach(w -> w.changedQuartzEvent(tableGroupIndex, event, before, after));
         }
     }
 
@@ -73,4 +73,4 @@ public abstract class AbstractExtractor implements Extractor {
         this.map = map;
     }
 
-}
+}

+ 2 - 11
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -1,16 +1,7 @@
 package org.dbsyncer.listener;
 
-import org.dbsyncer.listener.config.ExtractorConfig;
-
 public interface Listener {
 
-    /**
-     * 创建抽取器
-     *
-     * @param config 抽取器配置
-     * @return
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     */
-    AbstractExtractor getExtractor(ExtractorConfig config) throws IllegalAccessException, InstantiationException;
+    <T> T getExtractor(String type, Class<T> valueType) throws IllegalAccessException, InstantiationException;
+
 }

+ 2 - 42
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -1,54 +1,14 @@
 package org.dbsyncer.listener;
 
-import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ExtractorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
-import org.dbsyncer.listener.enums.ListenerTypeEnum;
-import org.dbsyncer.listener.quartz.QuartzExtractor;
-import org.dbsyncer.listener.quartz.ScheduledTaskService;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
 
 @Component
 public class ListenerFactory implements Listener {
 
-    @Autowired
-    private ConnectorFactory connectorFactory;
-
-    @Autowired
-    private ScheduledTaskService scheduledTaskService;
-
     @Override
-    public AbstractExtractor getExtractor(ExtractorConfig config) throws IllegalAccessException, InstantiationException {
-        final ListenerConfig listenerConfig = config.getListenerConfig();
-        final ConnectorConfig connectorConfig = config.getConnectorConfig();
-        final String listenerType = listenerConfig.getListenerType();
-        final String connectorType = connectorConfig.getConnectorType();
-
-        AbstractExtractor extractor = getDefaultExtractor(listenerType, connectorType, config);
-        extractor.setConnectorConfig(connectorConfig);
-        extractor.setListenerConfig(listenerConfig);
-        extractor.setMap(config.getMap());
-        return extractor;
-    }
-
-    private AbstractExtractor getDefaultExtractor(String listenerType, String connectorType, ExtractorConfig config) throws IllegalAccessException, InstantiationException {
-        // 默认定时抽取
-        if (ListenerTypeEnum.isTiming(listenerType)) {
-            Class<QuartzExtractor> clazz = (Class<QuartzExtractor>) ListenerEnum.DEFAULT.getClazz();
-            QuartzExtractor extractor = clazz.newInstance();
-            extractor.setConnectorFactory(connectorFactory);
-            extractor.setScheduledTaskService(scheduledTaskService);
-            extractor.setTableCommandConfig(config.getTableCommandConfig());
-            return extractor;
-        }
-
-        // 基于日志抽取
-        Assert.isTrue(ListenerTypeEnum.isLog(listenerType), "未知的同步方式.");
-        Class<AbstractExtractor> clazz = (Class<AbstractExtractor>) ListenerEnum.getExtractor(connectorType);
+    public <T> T getExtractor(String type, Class<T> valueType) throws IllegalAccessException, InstantiationException {
+        Class<T> clazz = (Class<T>) ListenerEnum.getExtractor(type);
         return clazz.newInstance();
     }
 

+ 8 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ExtractorConfig.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.listener.config;
 
+import org.dbsyncer.common.event.Event;
 import org.dbsyncer.connector.config.ConnectorConfig;
 
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -15,7 +15,7 @@ public class ExtractorConfig {
     private ConnectorConfig connectorConfig;
     private ListenerConfig listenerConfig;
     private Map<String, String> map;
-    private List<TableCommandConfig> tableCommandConfig;
+    private Event event;
 
     /**
      * 抽取器配置
@@ -23,13 +23,13 @@ public class ExtractorConfig {
      * @param connectorConfig 连接器配置
      * @param listenerConfig 监听配置
      * @param map 增量元信息
-     * @param tableCommandConfig 映射关系
+     * @param event 监听器
      */
-    public ExtractorConfig(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map, List<TableCommandConfig> tableCommandConfig) {
+    public ExtractorConfig(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map, Event event) {
         this.connectorConfig = connectorConfig;
         this.listenerConfig = listenerConfig;
         this.map = map;
-        this.tableCommandConfig = tableCommandConfig;
+        this.event = event;
     }
 
     public ConnectorConfig getConnectorConfig() {
@@ -44,7 +44,7 @@ public class ExtractorConfig {
         return map;
     }
 
-    public List<TableCommandConfig> getTableCommandConfig() {
-        return tableCommandConfig;
+    public Event getEvent() {
+        return event;
     }
-}
+}

+ 3 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ListenerConfig.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.listener.config;
 
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -9,6 +11,7 @@ public class ListenerConfig {
 
     /**
      * 监听器类型
+     * {@link ListenerTypeEnum}
      */
     private String listenerType;
 

+ 38 - 41
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -6,7 +6,6 @@ import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
-import org.dbsyncer.listener.config.TableCommandConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,8 +30,8 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
     private ConnectorFactory connectorFactory;
     private ScheduledTaskService scheduledTaskService;
-    private List<TableCommandConfig> tableCommandConfig;
-    private int tableCommandConfigSize;
+    private List<Map<String, String>> commands;
+    private int commandSize;
 
     private int readNum;
     private String eventFieldName;
@@ -52,13 +51,17 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
     @Override
     public void run() {
-        if (running.compareAndSet(false, true)) {
-            // 依次执行同步映射关系
-            for (int i = 0; i < tableCommandConfigSize; i++) {
-                execute(tableCommandConfig.get(i));
+        try {
+            if (running.compareAndSet(false, true)) {
+                // 依次执行同步映射关系
+                for (int i = 0; i < commandSize; i++) {
+                    execute(commands.get(i), i);
+                }
             }
+        } catch (Exception e) {
+            errorEvent(e);
+            logger.error(e.getMessage());
         }
-
     }
 
     @Override
@@ -66,43 +69,38 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         scheduledTaskService.stop(key);
     }
 
-    private void execute(TableCommandConfig t) {
-        try {
-            final String table = t.getTable();
-            int pageIndex = 1;
-            for (; ; ) {
-                Result reader = connectorFactory.reader(connectorConfig, t.getCommand(), pageIndex++, readNum);
-                List<Map<String, Object>> data = reader.getData();
-                if (CollectionUtils.isEmpty(data)) {
-                    break;
-                }
-
-                Object event = null;
-                for (Map<String, Object> row : data) {
-                    event = row.get(eventFieldName);
-                    if (update.contains(event)) {
-                        changedQuartzEvent(table, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row);
-                        continue;
-                    }
-                    if (insert.contains(event)) {
-                        changedQuartzEvent(table, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row);
-                        continue;
-                    }
-                    if (delete.contains(event)) {
-                        changedQuartzEvent(table, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP);
-                        continue;
-                    }
+    private void execute(Map<String, String> command, int index) {
+        int pageIndex = 1;
+        for (; ; ) {
+            Result reader = connectorFactory.reader(connectorConfig, command, pageIndex++, readNum);
+            List<Map<String, Object>> data = reader.getData();
+            if (CollectionUtils.isEmpty(data)) {
+                break;
+            }
 
+            Object event = null;
+            for (Map<String, Object> row : data) {
+                event = row.get(eventFieldName);
+                if (update.contains(event)) {
+                    changedQuartzEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row);
+                    continue;
+                }
+                if (insert.contains(event)) {
+                    changedQuartzEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row);
+                    continue;
+                }
+                if (delete.contains(event)) {
+                    changedQuartzEvent(index, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP);
+                    continue;
                 }
+
             }
-        } catch (Exception e) {
-            errorEvent(e);
-            logger.error(e.getMessage());
         }
+
     }
 
     private void init() {
-        tableCommandConfigSize = tableCommandConfig.size();
+        commandSize = commands.size();
 
         readNum = listenerConfig.getReadNum();
         eventFieldName = listenerConfig.getEventFieldName();
@@ -122,8 +120,7 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         this.scheduledTaskService = scheduledTaskService;
     }
 
-    public void setTableCommandConfig(List<TableCommandConfig> tableCommandConfig) {
-        this.tableCommandConfig = tableCommandConfig;
+    public void setCommands(List<Map<String, String>> commands) {
+        this.commands = commands;
     }
-
 }

+ 21 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -2,6 +2,8 @@ package org.dbsyncer.manager.config;
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.parser.model.DataEvent;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.springframework.util.Assert;
@@ -12,16 +14,16 @@ import java.util.stream.Collectors;
 public class FieldPicker {
 
     private TableGroup tableGroup;
-    private List<Field> column;
-    private List<FieldMapping> fieldMapping;
     private List<Node> index;
     private int indexSize;
 
-    public FieldPicker(TableGroup tableGroup, List<Field> column, List<FieldMapping> fieldMapping) {
+    public FieldPicker(TableGroup tableGroup, List<Filter> filter) {
         this.tableGroup = tableGroup;
-        this.column = column;
-        this.fieldMapping = fieldMapping;
-        init();
+    }
+
+    public FieldPicker(TableGroup tableGroup, List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
+        this.tableGroup = tableGroup;
+        init(filter, column, fieldMapping);
     }
 
     public Map<String, Object> getColumns(List<Object> list) {
@@ -42,7 +44,19 @@ public class FieldPicker {
         return tableGroup;
     }
 
-    private void init() {
+    /**
+     * 根据过滤条件过滤
+     *
+     * @param data
+     * @return
+     */
+    public boolean filter(DataEvent data) {
+        // TODO 过滤
+        //Map<String, Object> row = data.getData();
+        return false;
+    }
+
+    private void init(List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
         // column  => [1, 86, 0, 中文, 2020-05-15T12:17:22.000+0800, 备注信息]
         Assert.notEmpty(column, "读取字段不能为空.");
         Assert.notEmpty(fieldMapping, "映射关系不能为空.");

+ 155 - 55
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -3,13 +3,15 @@ package org.dbsyncer.manager.puller.impl;
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ExtractorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
-import org.dbsyncer.listener.config.TableCommandConfig;
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
+import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.dbsyncer.manager.Manager;
@@ -27,10 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -62,6 +61,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     @Autowired
     private ScheduledTaskService scheduledTaskService;
 
+    @Autowired
+    private ConnectorFactory connectorFactory;
+
     private String key;
 
     private Map<String, AbstractExtractor> map = new ConcurrentHashMap<>();
@@ -77,21 +79,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             Assert.notEmpty(list, "映射关系不能为空.");
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
-            ConnectorConfig connectorConfig = connector.getConfig();
-            ListenerConfig listenerConfig = mapping.getListener();
-            List<TableCommandConfig> tableCommandConfig = list.stream().map(t ->
-                    new TableCommandConfig(t.getSourceTable().getName(), t.getCommand())
-            ).collect(Collectors.toList());
-
-            AbstractExtractor extractor = listener.getExtractor(new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), tableCommandConfig));
+            AbstractExtractor extractor = getExtractor(mapping, connector, list, meta);
             Assert.notNull(extractor, "未知的监听配置.");
+
             long now = System.currentTimeMillis();
             meta.setBeginTime(now);
             meta.setEndTime(now);
             manager.editMeta(meta);
-
-            // 监听数据变更事件
-            extractor.addListener(new DefaultListener(mapping, list));
             map.putIfAbsent(metaId, extractor);
 
             // 执行任务
@@ -132,55 +126,59 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         scheduledTaskService.stop(key);
     }
 
-    final class DefaultListener implements Event {
+    private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
+            throws InstantiationException, IllegalAccessException {
+        ConnectorConfig connectorConfig = connector.getConfig();
+        ListenerConfig listenerConfig = mapping.getListener();
 
-        private Mapping mapping;
-        private String metaId;
-        private Map<String, List<FieldPicker>> tablePicker;
-        private AtomicBoolean changed = new AtomicBoolean();
+        // timing/log
+        final String listenerType = listenerConfig.getListenerType();
 
-        public DefaultListener(Mapping mapping, List<TableGroup> list) {
-            this.mapping = mapping;
-            this.metaId = mapping.getMetaId();
-            this.tablePicker = new LinkedHashMap<>();
-            list.forEach(t -> {
-                final Table table = t.getSourceTable();
-                final String tableName = table.getName();
-                tablePicker.putIfAbsent(tableName, new ArrayList<>());
-                tablePicker.get(tableName).add(new FieldPicker(t, table.getColumn(), t.getFieldMapping()));
-            });
+        // 默认定时抽取
+        if (ListenerTypeEnum.isTiming(listenerType)) {
+            QuartzExtractor extractor = listener.getExtractor(listenerType, QuartzExtractor.class);
+            List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
+
+            ExtractorConfig config = new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
+            setExtractorConfig(extractor, config);
+            extractor.setConnectorFactory(connectorFactory);
+            extractor.setScheduledTaskService(scheduledTaskService);
+            extractor.setCommands(commands);
+            return extractor;
         }
 
-        @Override
-        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
-            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
+        // 基于日志抽取
+        if (ListenerTypeEnum.isLog(listenerType)) {
+            final String connectorType = connectorConfig.getConnectorType();
+            AbstractExtractor extractor = listener.getExtractor(connectorType, AbstractExtractor.class);
 
-            // 处理过程有异常向上抛
-            List<FieldPicker> pickers = tablePicker.get(tableName);
-            if (!CollectionUtils.isEmpty(pickers)) {
-                pickers.parallelStream().forEach(p -> {
-                    DataEvent data = new DataEvent(event, p.getColumns(before), p.getColumns(after));
-                    parser.execute(mapping, p.getTableGroup(), data);
-                });
-            }
-
-            // 标记有变更记录
-            changed.compareAndSet(false, true);
+            ExtractorConfig config = new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list));
+            setExtractorConfig(extractor, config);
+            return extractor;
         }
+        return null;
+    }
+
+    private void setExtractorConfig(AbstractExtractor extractor, ExtractorConfig config) {
+        extractor.setConnectorConfig(config.getConnectorConfig());
+        extractor.setListenerConfig(config.getListenerConfig());
+        extractor.setMap(config.getMap());
+        extractor.addListener(config.getEvent());
+    }
+
+    abstract class AbstractListener implements Event {
+        protected Mapping mapping;
+        protected String metaId;
+        protected AtomicBoolean changed = new AtomicBoolean();
 
         @Override
-        public void changedQuartzEvent(String tableName, String event, Map<String, Object> before, Map<String, Object> after) {
-            // 处理过程有异常向上抛
-            List<FieldPicker> pickers = tablePicker.get(tableName);
-            if (!CollectionUtils.isEmpty(pickers)) {
-                pickers.parallelStream().forEach(p -> {
-                    DataEvent data = new DataEvent(event, before, after);
-                    parser.execute(mapping, p.getTableGroup(), data);
-                });
-            }
+        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
+            // nothing to do
+        }
 
-            // 标记有变更记录
-            changed.compareAndSet(false, true);
+        @Override
+        public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+            // nothing to do
         }
 
         @Override
@@ -203,4 +201,106 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
     }
 
+    /**
+     * </p>定时模式
+     * <ol>
+     * <li>获取映射关系增量数据</li>
+     * <li>根据过滤条件筛选</li>
+     * </ol>
+     * </p>同步关系:
+     * </p>数据源表 >> 目标源表
+     * <ul>
+     * <li>A >> B</li>
+     * <li>A >> C</li>
+     * <li>E >> F</li>
+     * </ul>
+     * </p>PS:
+     * <ol>
+     * <li>依次执行同步关系A >> B 然后 A >> C ...</li>
+     * </ol>
+     */
+    final class QuartzListener extends AbstractListener {
+
+        private List<FieldPicker> tablePicker;
+
+        public QuartzListener(Mapping mapping, List<TableGroup> list) {
+            this.mapping = mapping;
+            this.metaId = mapping.getMetaId();
+            this.tablePicker = new LinkedList<>();
+            list.forEach(t -> tablePicker.add(new FieldPicker(t, t.getFilter())));
+        }
+
+        @Override
+        public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+            final FieldPicker picker = tablePicker.get(tableGroupIndex);
+            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", picker.getTableGroup().getSourceTable().getName(), event, before, after);
+
+            // 处理过程有异常向上抛
+            DataEvent data = new DataEvent(event, before, after);
+            if(picker.filter(data)){
+                parser.execute(mapping, picker.getTableGroup(), data);
+            }
+
+            // 标记有变更记录
+            changed.compareAndSet(false, true);
+        }
+    }
+
+    /**
+     * </p>日志模式
+     * <ol>
+     * <li>监听表增量数据</li>
+     * <li>根据过滤条件筛选</li>
+     * </ol>
+     * </p>同步关系:
+     * </p>数据源表 >> 目标源表
+     * <ul>
+     * <li>A >> B</li>
+     * <li>A >> C</li>
+     * <li>E >> F</li>
+     * </ul>
+     * </p>PS:
+     * <ol>
+     * <li>为减少开销而选择复用监听器实例, 启动时只需创建一个数据源连接器.</li>
+     * <li>关系A >> B和A >> C会复用A监听的数据, A监听到增量数据,会发送给B和C.</li>
+     * <li>该模式下,会监听表所有字段.</li>
+     * </ol>
+     */
+    final class LogListener extends AbstractListener {
+
+        private Map<String, List<FieldPicker>> tablePicker;
+
+        public LogListener(Mapping mapping, List<TableGroup> list) {
+            this.mapping = mapping;
+            this.metaId = mapping.getMetaId();
+            this.tablePicker = new LinkedHashMap<>();
+            list.forEach(t -> {
+                final Table table = t.getSourceTable();
+                final String tableName = table.getName();
+                tablePicker.putIfAbsent(tableName, new ArrayList<>());
+                tablePicker.get(tableName).add(new FieldPicker(t, t.getFilter(), table.getColumn(), t.getFieldMapping()));
+            });
+        }
+
+        @Override
+        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
+            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
+
+            // 处理过程有异常向上抛
+            List<FieldPicker> pickers = tablePicker.get(tableName);
+            if (!CollectionUtils.isEmpty(pickers)) {
+                pickers.parallelStream().forEach(picker -> {
+                    DataEvent data = new DataEvent(event, picker.getColumns(before), picker.getColumns(after));
+                    if(picker.filter(data)){
+                        parser.execute(mapping, picker.getTableGroup(), data);
+                    }
+                });
+            }
+
+            // 标记有变更记录
+            changed.compareAndSet(false, true);
+        }
+
+    }
+
 }