AE86 5 年 前
コミット
ec2f107de8

+ 19 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -11,14 +11,24 @@ import java.util.Map;
 public interface Event {
 
     /**
-     * 数据变更事件
+     * 日志数据变更事件
      *
      * @param tableName 表名
      * @param event     事件
      * @param before    变化前
      * @param after     变化后
      */
-    void changedEvent(String tableName, String event, List<Object> before, List<Object> after);
+    void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after);
+
+    /**
+     * 定时数据变更事件
+     *
+     * @param tableName
+     * @param event
+     * @param before
+     * @param after
+     */
+    void changedQuartzEvent(String tableName, String event, Map<String, Object> before, Map<String, Object> after);
 
     /**
      * 写入增量点事件
@@ -27,4 +37,11 @@ public interface Event {
      */
     void flushEvent(Map<String, String> map);
 
+    /**
+     * 异常事件
+     *
+     * @param e
+     */
+    void errorEvent(Exception e);
+
 }

+ 14 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -37,9 +37,15 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
-    public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
+    public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedEvent(tableName, event, before, after));
+            watcher.forEach(w -> w.changedLogEvent(tableName, event, before, after));
+        }
+    }
+
+    public void changedQuartzEvent(String tableName, String event, Map<String, Object> before, Map<String, Object> after) {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.changedQuartzEvent(tableName, event, before, after));
         }
     }
 
@@ -49,6 +55,12 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
+    public void errorEvent(Exception e) {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.errorEvent(e));
+        }
+    }
+
     public void setConnectorConfig(ConnectorConfig connectorConfig) {
         this.connectorConfig = connectorConfig;
     }

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -12,5 +12,5 @@ public interface Listener {
      * @throws IllegalAccessException
      * @throws InstantiationException
      */
-    AbstractExtractor createExtractor(ExtractorConfig config) throws IllegalAccessException, InstantiationException;
+    AbstractExtractor getExtractor(ExtractorConfig config) throws IllegalAccessException, InstantiationException;
 }

+ 8 - 14
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -12,9 +12,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.util.List;
-import java.util.Map;
-
 @Component
 public class ListenerFactory implements Listener {
 
@@ -25,30 +22,27 @@ public class ListenerFactory implements Listener {
     private ScheduledTaskService scheduledTaskService;
 
     @Override
-    public AbstractExtractor createExtractor(ExtractorConfig config)
-            throws IllegalAccessException, InstantiationException {
-        ConnectorConfig connectorConfig = config.getConnectorConfig();
-        ListenerConfig listenerConfig = config.getListenerConfig();
-
-        AbstractExtractor extractor = getDefaultExtractor(listenerConfig.getListenerType(), connectorConfig.getConnectorType(),
-                config.getCommands(), config.getTableNames());
+    public AbstractExtractor getExtractor(ExtractorConfig config) throws IllegalAccessException, InstantiationException {
+        final ListenerConfig listenerConfig = config.getListenerConfig();
+        final ConnectorConfig connectorConfig = config.getConnectorConfig();
+        final String listenerType = listenerConfig.getListenerType();
+        final String connectorType = connectorConfig.getConnectorType();
 
+        AbstractExtractor extractor = getDefaultExtractor(listenerType, connectorType, config);
         extractor.setConnectorConfig(connectorConfig);
         extractor.setListenerConfig(listenerConfig);
         extractor.setMap(config.getMap());
         return extractor;
     }
 
-    private AbstractExtractor getDefaultExtractor(String listenerType, String connectorType, List<Map<String, String>> commands, List<String> tableNames)
-            throws IllegalAccessException, InstantiationException {
+    private AbstractExtractor getDefaultExtractor(String listenerType, String connectorType, ExtractorConfig config) throws IllegalAccessException, InstantiationException {
         // 默认定时抽取
         if (ListenerTypeEnum.isTiming(listenerType)) {
             Class<QuartzExtractor> clazz = (Class<QuartzExtractor>) ListenerEnum.DEFAULT.getClazz();
             QuartzExtractor extractor = clazz.newInstance();
             extractor.setConnectorFactory(connectorFactory);
             extractor.setScheduledTaskService(scheduledTaskService);
-            extractor.setCommands(commands);
-            extractor.setTableNames(tableNames);
+            extractor.setTableCommandConfig(config.getTableCommandConfig());
             return extractor;
         }
 

+ 14 - 12
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ExtractorConfig.java

@@ -15,15 +15,21 @@ public class ExtractorConfig {
     private ConnectorConfig connectorConfig;
     private ListenerConfig listenerConfig;
     private Map<String, String> map;
-    private List<Map<String, String>> commands;
-    private List<String> tableNames;
-
-    public ExtractorConfig(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map, List<Map<String, String>> commands, List<String> tableNames) {
+    private List<TableCommandConfig> tableCommandConfig;
+
+    /**
+     * 抽取器配置
+     *
+     * @param connectorConfig 连接器配置
+     * @param listenerConfig 监听配置
+     * @param map 增量元信息
+     * @param tableCommandConfig 映射关系
+     */
+    public ExtractorConfig(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map, List<TableCommandConfig> tableCommandConfig) {
         this.connectorConfig = connectorConfig;
         this.listenerConfig = listenerConfig;
         this.map = map;
-        this.commands = commands;
-        this.tableNames = tableNames;
+        this.tableCommandConfig = tableCommandConfig;
     }
 
     public ConnectorConfig getConnectorConfig() {
@@ -38,11 +44,7 @@ public class ExtractorConfig {
         return map;
     }
 
-    public List<Map<String, String>> getCommands() {
-        return commands;
-    }
-
-    public List<String> getTableNames() {
-        return tableNames;
+    public List<TableCommandConfig> getTableCommandConfig() {
+        return tableCommandConfig;
     }
 }

+ 13 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ListenerConfig.java

@@ -12,6 +12,11 @@ public class ListenerConfig {
      */
     private String listenerType;
 
+    /**
+     * 每次读取数
+     */
+    private int readNum = 200;
+
     // 定时表达式, 格式: [秒] [分] [小时] [日] [月] [周]
     private String cronExpression = "*/30 * * * * ?";
 
@@ -45,6 +50,14 @@ public class ListenerConfig {
         this.listenerType = listenerType;
     }
 
+    public int getReadNum() {
+        return readNum;
+    }
+
+    public void setReadNum(int readNum) {
+        this.readNum = readNum;
+    }
+
     public String getCronExpression() {
         return cronExpression;
     }

+ 29 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/TableCommandConfig.java

@@ -0,0 +1,29 @@
+package org.dbsyncer.listener.config;
+
+import java.util.Map;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-25 23:52
+ */
+public class TableCommandConfig {
+
+    private String table;
+
+    private Map<String, String> command;
+
+    public TableCommandConfig(String table, Map<String, String> command) {
+        this.table = table;
+        this.command = command;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+}

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -147,7 +147,7 @@ public class MysqlExtractor extends AbstractExtractor {
                     List<Object> after = new ArrayList<>();
                     addAll(before, p.getBefore().getColumns());
                     addAll(after, p.getAfter().getColumns());
-                    changedEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after);
+                    changedLogEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after);
                     break;
                 }
                 return;
@@ -160,7 +160,7 @@ public class MysqlExtractor extends AbstractExtractor {
                 for (Row row : rows) {
                     List<Object> after = new ArrayList<>();
                     addAll(after, row.getColumns());
-                    changedEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after);
+                    changedLogEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after);
                     break;
                 }
                 return;
@@ -173,7 +173,7 @@ public class MysqlExtractor extends AbstractExtractor {
                 for (Row row : rows) {
                     List<Object> before = new ArrayList<>();
                     addAll(before, row.getColumns());
-                    changedEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST);
+                    changedLogEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST);
                     break;
                 }
                 return;

+ 75 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -1,14 +1,22 @@
 package org.dbsyncer.listener.quartz;
 
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.config.TableCommandConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * 默认定时抽取
@@ -23,28 +31,34 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
     private ConnectorFactory connectorFactory;
     private ScheduledTaskService scheduledTaskService;
-    private List<Map<String, String>> commands;
-    private List<String> tableNames;
+    private List<TableCommandConfig> tableCommandConfig;
+    private int tableCommandConfigSize;
+
+    private int readNum;
+    private String eventFieldName;
+    private Set<String> update;
+    private Set<String> insert;
+    private Set<String> delete;
     private String key;
+    private String cron;
     private AtomicBoolean running = new AtomicBoolean();
 
     @Override
     public void start() {
-        key = UUIDUtil.getUUID();
-        String cron = listenerConfig.getCronExpression();
-        logger.info("启动定时任务:{} >> {}", key, cron);
+        init();
         scheduledTaskService.start(key, cron, this);
+        logger.info("启动定时任务:{} >> {}", key, cron);
     }
 
     @Override
     public void run() {
-        if(running.compareAndSet(false, true)){
-            // TODO 获取tableGroup
-            Map<String, String> command = null;
-            int pageIndex = 1;
-            int pageSize = 20;
-            connectorFactory.reader(connectorConfig, command, pageIndex, pageSize);
+        if (running.compareAndSet(false, true)) {
+            // 依次执行同步映射关系
+            for (int i = 0; i < tableCommandConfigSize; i++) {
+                execute(tableCommandConfig.get(i));
+            }
         }
+
     }
 
     @Override
@@ -52,6 +66,54 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         scheduledTaskService.stop(key);
     }
 
+    private void execute(TableCommandConfig t) {
+        try {
+            final String table = t.getTable();
+            int pageIndex = 1;
+            for (; ; ) {
+                Result reader = connectorFactory.reader(connectorConfig, t.getCommand(), pageIndex++, readNum);
+                List<Map<String, Object>> data = reader.getData();
+                if (CollectionUtils.isEmpty(data)) {
+                    break;
+                }
+
+                Object event = null;
+                for (Map<String, Object> row : data) {
+                    event = row.get(eventFieldName);
+                    if (update.contains(event)) {
+                        changedQuartzEvent(table, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row);
+                        continue;
+                    }
+                    if (insert.contains(event)) {
+                        changedQuartzEvent(table, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row);
+                        continue;
+                    }
+                    if (delete.contains(event)) {
+                        changedQuartzEvent(table, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP);
+                        continue;
+                    }
+
+                }
+            }
+        } catch (Exception e) {
+            errorEvent(e);
+            logger.error(e.getMessage());
+        }
+    }
+
+    private void init() {
+        tableCommandConfigSize = tableCommandConfig.size();
+
+        readNum = listenerConfig.getReadNum();
+        eventFieldName = listenerConfig.getEventFieldName();
+        update = Stream.of(listenerConfig.getUpdate().split(",")).collect(Collectors.toSet());
+        insert = Stream.of(listenerConfig.getInsert().split(",")).collect(Collectors.toSet());
+        delete = Stream.of(listenerConfig.getDelete().split(",")).collect(Collectors.toSet());
+
+        key = UUIDUtil.getUUID();
+        cron = listenerConfig.getCronExpression();
+    }
+
     public void setConnectorFactory(ConnectorFactory connectorFactory) {
         this.connectorFactory = connectorFactory;
     }
@@ -60,11 +122,8 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         this.scheduledTaskService = scheduledTaskService;
     }
 
-    public void setCommands(List<Map<String, String>> commands) {
-        this.commands = commands;
+    public void setTableCommandConfig(List<TableCommandConfig> tableCommandConfig) {
+        this.tableCommandConfig = tableCommandConfig;
     }
 
-    public void setTableNames(List<String> tableNames) {
-        this.tableNames = tableNames;
-    }
 }

+ 34 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -6,16 +6,18 @@ import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ExtractorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
-import org.dbsyncer.listener.quartz.QuartzExtractor;
-import org.dbsyncer.listener.Listener;
+import org.dbsyncer.listener.config.TableCommandConfig;
 import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.logger.LogService;
+import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +56,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     @Autowired
     private Manager manager;
 
+    @Autowired
+    private LogService logService;
+
     @Autowired
     private ScheduledTaskService scheduledTaskService;
 
@@ -70,13 +75,15 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             Assert.notNull(connector, "连接器不能为空.");
             List<TableGroup> list = manager.getTableGroupAll(mappingId);
             Assert.notEmpty(list, "映射关系不能为空.");
-            List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
-            List<String> tableNames = list.stream().map(t -> t.getSourceTable().getName()).collect(Collectors.toList());
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
+            ConnectorConfig connectorConfig = connector.getConfig();
+            ListenerConfig listenerConfig = mapping.getListener();
+            List<TableCommandConfig> tableCommandConfig = list.stream().map(t ->
+                    new TableCommandConfig(t.getSourceTable().getName(), t.getCommand())
+            ).collect(Collectors.toList());
 
-            ExtractorConfig config = new ExtractorConfig(connector.getConfig(), mapping.getListener(), meta.getMap(), commands, tableNames);
-            AbstractExtractor extractor = listener.createExtractor(config);
+            AbstractExtractor extractor = listener.getExtractor(new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), tableCommandConfig));
             Assert.notNull(extractor, "未知的监听配置.");
             long now = System.currentTimeMillis();
             meta.setBeginTime(now);
@@ -145,7 +152,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         }
 
         @Override
-        public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
+        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
             logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
 
             // 处理过程有异常向上抛
@@ -161,6 +168,21 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             changed.compareAndSet(false, true);
         }
 
+        @Override
+        public void changedQuartzEvent(String tableName, String event, Map<String, Object> before, Map<String, Object> after) {
+            // 处理过程有异常向上抛
+            List<FieldPicker> pickers = tablePicker.get(tableName);
+            if (!CollectionUtils.isEmpty(pickers)) {
+                pickers.parallelStream().forEach(p -> {
+                    DataEvent data = new DataEvent(event, before, after);
+                    parser.execute(mapping, p.getTableGroup(), data);
+                });
+            }
+
+            // 标记有变更记录
+            changed.compareAndSet(false, true);
+        }
+
         @Override
         public void flushEvent(Map<String, String> map) {
             // 如果有变更,执行更新
@@ -174,6 +196,11 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             }
         }
 
+        @Override
+        public void errorEvent(Exception e) {
+            logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
+        }
+
     }
 
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogService.java

@@ -7,9 +7,9 @@ package org.dbsyncer.parser.logger;
  */
 public interface LogService {
 
-    void log(String msg);
-
     void log(LogType logType);
 
+    void log(LogType logType, String msg);
+
     void log(LogType logType, String format, Object... args);
 }

+ 4 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogServiceImpl.java

@@ -16,13 +16,13 @@ public class LogServiceImpl implements LogService {
     private FlushService flushService;
 
     @Override
-    public void log(String msg) {
-        flushService.asyncWrite(LogType.SystemLog.INFO.getType(), msg);
+    public void log(LogType logType) {
+        flushService.asyncWrite(logType.getType(), logType.getMessage());
     }
 
     @Override
-    public void log(LogType logType) {
-        flushService.asyncWrite(logType.getType(), logType.getMessage());
+    public void log(LogType logType, String msg) {
+        flushService.asyncWrite(logType.getType(), msg);
     }
 
     @Override

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogType.java

@@ -144,7 +144,8 @@ public interface LogType {
     enum TableGroupLog implements LogType {
         INSERT("40", "新增"),
         UPDATE("41", "修改"),
-        DELETE("42", "删除");
+        DELETE("42", "删除"),
+        INCREMENT_FAILED("43", "增量同步异常");
 
         private String type;
         private String message;