AE86 5 년 전
부모
커밋
f76701bf5b

+ 3 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/LogConfigChecker.java

@@ -4,10 +4,10 @@ import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.biz.checker.MappingLogConfigChecker;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.parser.model.ListenerConfig;
-import org.dbsyncer.parser.enums.ListenerEnum;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.ListenerConfig;
 import org.dbsyncer.parser.model.Mapping;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -51,7 +51,7 @@ public class LogConfigChecker implements MappingConfigChecker, ApplicationContex
         ListenerConfig listener = mapping.getListener();
         Assert.notNull(listener, "ListenerConfig can not be null.");
 
-        listener.setListenerType(ListenerEnum.LOG.getCode());
+        listener.setListenerType(IncrementEnum.LOG.getType());
     }
 
 }

+ 3 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -7,14 +7,10 @@ import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.parser.model.ListenerConfig;
-import org.dbsyncer.parser.enums.ListenerEnum;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
-import org.dbsyncer.parser.model.ConfigModel;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.Meta;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +65,7 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         mapping.setSourceConnectorId(sourceConnectorId);
         mapping.setTargetConnectorId(targetConnectorId);
         mapping.setModel(ModelEnum.FULL.getCode());
-        mapping.setListener(new ListenerConfig(ListenerEnum.TIMING.getCode()));
+        mapping.setListener(new ListenerConfig(IncrementEnum.TIMING.getType()));
 
         // 修改基本配置
         this.modifyConfigModel(mapping, params);

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/TimingConfigChecker.java

@@ -2,8 +2,8 @@ package org.dbsyncer.biz.checker.impl.mapping;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.biz.checker.MappingConfigChecker;
+import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.parser.model.ListenerConfig;
-import org.dbsyncer.parser.enums.ListenerEnum;
 import org.dbsyncer.parser.model.Mapping;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -47,7 +47,7 @@ public class TimingConfigChecker implements MappingConfigChecker {
             config.setDelete(delete);
         }
 
-        config.setListenerType(ListenerEnum.TIMING.getCode());
+        config.setListenerType(IncrementEnum.TIMING.getType());
         mapping.setListener(config);
     }
 

+ 9 - 3
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.common.event;
 
+import java.util.Map;
+
 /**
  * @version 1.0.0
  * @Author AE86
@@ -8,8 +10,12 @@ package org.dbsyncer.common.event;
 public interface Event {
 
     /**
-     * 触发事件
+     * 数据变更事件
+     *
+     * @param event  事件
+     * @param before 变化前
+     * @param after  变化后
      */
-    void changedEvent();
+    void changedEvent(String event, Map<String, Object> before, Map<String, Object> after);
 
-}
+}

+ 25 - 15
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -1,8 +1,10 @@
 package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.util.CollectionUtils;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -10,23 +12,31 @@ import java.util.concurrent.CopyOnWriteArrayList;
  * @Author AE86
  * @Date 2020-05-12 20:35
  */
-public abstract class AbstractExtractor implements Extractor {
-
-    private List<Event> watcher = new CopyOnWriteArrayList<>();;
-
-    /**
-     * 订阅事件
-     */
-    public void attach(Event event) {
-        watcher.add(event);
+public abstract class DefaultExtractor implements Extractor {
+
+    private Map<String, String> map;
+    private List<Event>         watcher;
+
+    public void addListener(Event event) {
+        if (null != event) {
+            if (null == watcher) {
+                watcher = new CopyOnWriteArrayList<>();
+            }
+            watcher.add(event);
+        }
     }
 
-    /**
-     * 通知关闭事件
-     */
-    public void notifyEvent() {
-        watcher.forEach(w -> w.changedEvent());
+    public void changedEvent(String event, Map<String, Object> before, Map<String, Object> after) {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.changedEvent(event, before, after));
+        }
     }
 
+    public Map<String, String> getMap() {
+        return map;
+    }
 
-}
+    public void setMap(Map<String, String> map) {
+        this.map = map;
+    }
+}

+ 25 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -1,17 +1,37 @@
 package org.dbsyncer.listener.extractor;
 
-import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.DefaultExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
 
 /**
  * @version 1.0.0
  * @Author AE86
  * @Date 2020-05-12 21:14
  */
-public class MysqlExtractor extends AbstractExtractor {
+public class MysqlExtractor extends DefaultExtractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    boolean running;
 
     @Override
     public void extract() {
-
+        running = true;
+        for (int i = 0; i < 20; i++) {
+            if(!running){
+                logger.info("中止监听任务");
+                break;
+            }
+            logger.info("模拟监听任务");
+            try {
+                TimeUnit.SECONDS.sleep(3);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
     }
 
     @Override
@@ -21,6 +41,6 @@ public class MysqlExtractor extends AbstractExtractor {
 
     @Override
     public void close() {
-
+        running = false;
     }
-}
+}

+ 60 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/IncrementEnum.java

@@ -0,0 +1,60 @@
+package org.dbsyncer.manager.enums;
+
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.listener.Extractor;
+import org.dbsyncer.manager.ManagerException;
+import org.dbsyncer.manager.puller.Increment;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/24 14:19
+ */
+public enum IncrementEnum {
+
+    /**
+     * 日志
+     */
+    LOG("log", new Increment() {
+
+        @Override
+        public void execute(Extractor extractor) {
+            extractor.extract();
+        }
+    }),
+    /**
+     * 定时
+     */
+    TIMING("timing", new Increment() {
+
+        @Override
+        public void execute(Extractor extractor) {
+            extractor.extractTiming();
+        }
+    });
+
+    private String    type;
+    private Increment increment;
+
+    IncrementEnum(String type, Increment increment) {
+        this.type = type;
+        this.increment = increment;
+    }
+
+    public static Increment getIncrement(String type) throws ManagerException {
+        for (IncrementEnum e : IncrementEnum.values()) {
+            if (StringUtils.equals(type, e.getType())) {
+                return e.getIncrement();
+            }
+        }
+        throw new ManagerException(String.format("Increment type \"%s\" does not exist.", type));
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Increment getIncrement() {
+        return increment;
+    }
+}

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/Increment.java

@@ -4,6 +4,6 @@ import org.dbsyncer.listener.Extractor;
 
 public interface Increment {
 
-    void execute(String mappingId, String metaId, Extractor extractor);
+    void execute(Extractor extractor);
 
-}
+}

+ 63 - 44
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -1,28 +1,22 @@
 package org.dbsyncer.manager.puller.impl;
 
-import org.dbsyncer.common.event.IncrementRefreshEvent;
-import org.dbsyncer.common.model.Task;
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.Listener;
-import org.dbsyncer.manager.puller.AbstractPuller;
+import org.dbsyncer.common.event.Event;
+import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.Extractor;
-import org.dbsyncer.parser.model.ListenerConfig;
+import org.dbsyncer.listener.Listener;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.enums.IncrementEnum;
+import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.manager.puller.Increment;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -34,52 +28,51 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ApplicationContextAware, ApplicationListener<IncrementRefreshEvent> {
+public class IncrementPuller extends AbstractPuller {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    @Autowired
+    private Parser parser;
+
     @Autowired
     private Listener listener;
 
     @Autowired
     private Manager manager;
 
-    private Map<String, Extractor> map = new ConcurrentHashMap<>();
-
-    private Map<String, Increment> handle;
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        handle = applicationContext.getBeansOfType(Increment.class);
-    }
+    private Map<String, DefaultExtractor> map = new ConcurrentHashMap<>();
 
     @Override
     public void asyncStart(Mapping mapping) {
         final String mappingId = mapping.getId();
         final String metaId = mapping.getMetaId();
         try {
-            // log/timing
             ListenerConfig listenerConfig = mapping.getListener();
-            String listenerType = listenerConfig.getListenerType();
-            String type = StringUtil.toLowerCaseFirstOne(listenerType).concat("Increment");
-            Increment increment = handle.get(type);
+            // log/timing
+            Increment increment = IncrementEnum.getIncrement(listenerConfig.getListenerType());
             Assert.notNull(increment, "未知的增量同步方式.");
             Connector connector = manager.getConnector(mapping.getSourceConnectorId());
             Assert.notNull(connector, "连接器不能为空.");
-            Extractor extractor = listener.createExtractor(connector.getConfig());
-            Assert.notNull(extractor, "未知的连接器配置.");
+            List<TableGroup> list = manager.getTableGroupAll(mappingId);
+            Assert.notEmpty(list, "映射关系不能为空");
+            Meta meta = manager.getMeta(metaId);
+            Assert.notNull(meta, "Meta不能为空.");
+            DefaultExtractor extractor = (DefaultExtractor) listener.createExtractor(connector.getConfig());
+            Assert.notNull(extractor, "未知的监听配置.");
+
+            // 监听数据变更事件
+            extractor.addListener(new DefaultListener(mapping, list));
+            extractor.setMap(meta.getMap());
             map.putIfAbsent(metaId, extractor);
 
             // 执行任务
-            logger.info("启动任务:{}", metaId);
-            extractor = map.get(metaId);
-            increment.execute(mappingId, metaId, extractor);
+            logger.info("启动成功:{}", metaId);
+            increment.execute(map.get(metaId));
         } catch (Exception e) {
-            logger.error(e.getMessage());
-            map.remove(metaId);
-            publishClosedEvent(metaId);
+            logger.error("任务:{} 运行异常:{}", metaId, e.getMessage());
         } finally {
-            logger.info("启动成功:{}", metaId);
+            finished(metaId);
         }
     }
 
@@ -91,16 +84,42 @@ public class IncrementPuller extends AbstractPuller implements ApplicationContex
         }
     }
 
-    @Override
-    public void onApplicationEvent(IncrementRefreshEvent event) {
-        // 异步监听任务刷新事件
-        flush(event.getTask());
+    /**
+     * TODO 更新待优化,存在性能问题
+     *
+     * @param metaId
+     */
+    private void flush(String metaId) {
+        Meta meta = manager.getMeta(metaId);
+        DefaultExtractor extractor = map.get(metaId);
+        if (null != meta && null != extractor) {
+            meta.setMap(extractor.getMap());
+            manager.editMeta(meta);
+        }
+    }
+
+    private void finished(String metaId) {
+        map.remove(metaId);
+        publishClosedEvent(metaId);
     }
 
-    private void flush(Task task) {
-        Meta meta = manager.getMeta(task.getId());
-        Assert.notNull(meta, "检查meta为空.");
-        manager.editMeta(meta);
+    final class DefaultListener implements Event {
+
+        private Mapping          mapping;
+        private List<TableGroup> list;
+
+        public DefaultListener(Mapping mapping, List<TableGroup> list) {
+            this.mapping = mapping;
+            this.list = list;
+        }
+
+        @Override
+        public void changedEvent(String event, Map<String, Object> before, Map<String, Object> after) {
+            // 处理过程有异常向上抛
+            list.forEach(tableGroup -> parser.execute(mapping, tableGroup));
+            flush(mapping.getMetaId());
+        }
+
     }
 
 }

+ 8 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -118,4 +118,12 @@ public interface Parser {
      * @param tableGroup
      */
     void execute(Task task, Mapping mapping, TableGroup tableGroup);
+
+    /**
+     * 增量同步
+     *
+     * @param mapping
+     * @param tableGroup
+     */
+    void execute(Mapping mapping, TableGroup tableGroup);
 }

+ 14 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -224,6 +224,11 @@ public class ParserFactory implements Parser {
         }
     }
 
+    @Override
+    public void execute(Mapping mapping, TableGroup tableGroup) {
+
+    }
+
     /**
      * 更新缓存
      *
@@ -287,7 +292,8 @@ public class ParserFactory implements Parser {
      * @param batchSize
      * @return
      */
-    private Result executeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> target, int threadSize, int batchSize) {
+    private Result executeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> target,
+                                int threadSize, int batchSize) {
         // 总数
         int total = target.size();
         // 单次任务
@@ -303,12 +309,13 @@ public class ParserFactory implements Parser {
         Queue<Map<String, Object>> queue = new ConcurrentLinkedQueue<>(target);
 
         // 创建线程池
-        final ThreadPoolTaskExecutor executor = getThreadPoolTaskExecutor(threadSize);
+        final ThreadPoolTaskExecutor executor = getThreadPoolTaskExecutor(threadSize, taskSize - threadSize);
         final Result result = new Result();
         for (; ; ) {
             if (taskSize <= 0) {
                 break;
             }
+            // TODO 优化 CountDownLatch
             final CountDownLatch latch = new CountDownLatch(threadSize);
             for (int i = 0; i < threadSize; i++) {
                 executor.execute(() -> {
@@ -333,11 +340,13 @@ public class ParserFactory implements Parser {
 
             taskSize -= threadSize;
         }
+
         executor.shutdown();
         return result;
     }
 
-    private Result parallelTask(int batchSize, Queue<Map<String, Object>> queue, ConnectorConfig config, Map<String, String> command, List<Field> fields) {
+    private Result parallelTask(int batchSize, Queue<Map<String, Object>> queue, ConnectorConfig config, Map<String, String> command,
+                                List<Field> fields) {
         List<Map<String, Object>> data = new ArrayList<>();
         for (int j = 0; j < batchSize; j++) {
             Map<String, Object> poll = queue.poll();
@@ -349,11 +358,11 @@ public class ParserFactory implements Parser {
         return connectorFactory.writer(config, command, fields, data);
     }
 
-    private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(int threadSize) {
+    private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(int threadSize, int queueCapacity) {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(threadSize);
         executor.setMaxPoolSize(threadSize);
-        executor.setQueueCapacity(0);
+        executor.setQueueCapacity(queueCapacity);
         executor.setKeepAliveSeconds(30);
         executor.setAwaitTerminationSeconds(30);
         executor.setThreadNamePrefix("ParserExecutor");

+ 0 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/ListenerConfig.java

@@ -1,8 +1,5 @@
 package org.dbsyncer.parser.model;
 
-
-import org.dbsyncer.parser.enums.ListenerEnum;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -12,7 +9,6 @@ public class ListenerConfig {
 
     /**
      * 监听器类型
-     * @see ListenerEnum
      */
     private String listenerType;