浏览代码

add scheduled

AE86 5 年之前
父节点
当前提交
278231f7cd

+ 9 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -13,6 +13,7 @@ import org.dbsyncer.listener.mysql.common.glossary.Column;
 import org.dbsyncer.listener.mysql.common.glossary.Pair;
 import org.dbsyncer.listener.mysql.common.glossary.Row;
 import org.dbsyncer.listener.mysql.common.glossary.column.StringColumn;
+import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -202,4 +203,12 @@ public class MysqlExtractor extends DefaultExtractor {
 
     }
 
+    final class MysqlQuartzListener implements ScheduledTaskJob {
+
+        @Override
+        public void run() {
+
+        }
+    }
+
 }

+ 52 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTask.java

@@ -0,0 +1,52 @@
+package org.dbsyncer.listener.quartz;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-24 22:01
+ */
+public class ScheduledTask {
+
+    /**
+     * 任务key值 唯一
+     */
+    private String key;
+    /**
+     * 任务表达式
+     */
+    private String cron;
+    /**
+     * 定时执行job
+     */
+    private ScheduledTaskJob job;
+
+    public ScheduledTask(String key, String cron, ScheduledTaskJob job) {
+        this.key = key;
+        this.cron = cron;
+        this.job = job;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getCron() {
+        return cron;
+    }
+
+    public void setCron(String cron) {
+        this.cron = cron;
+    }
+
+    public ScheduledTaskJob getJob() {
+        return job;
+    }
+
+    public void setJob(ScheduledTaskJob job) {
+        this.job = job;
+    }
+}

+ 10 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskJob.java

@@ -0,0 +1,10 @@
+package org.dbsyncer.listener.quartz;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-24 22:18
+ */
+public interface ScheduledTaskJob extends Runnable {
+
+}

+ 9 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskService.java

@@ -0,0 +1,9 @@
+package org.dbsyncer.listener.quartz;
+
+public interface ScheduledTaskService {
+
+    void start(ScheduledTask task);
+
+    void stop(String taskKey);
+
+}

+ 76 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskServiceImpl.java

@@ -0,0 +1,76 @@
+package org.dbsyncer.listener.quartz;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-24 22:06
+ */
+@Component
+public class ScheduledTaskServiceImpl implements ScheduledTaskService {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * 定时任务线程池
+     */
+    @Autowired
+    private ThreadPoolTaskScheduler taskScheduler;
+
+    /**
+     * 存放已经启动的任务map
+     */
+    private Map<String, ScheduledFuture> map = new ConcurrentHashMap<>();
+
+    /**
+     * 根据任务key 启动任务
+     */
+    @Override
+    public void start(ScheduledTask task) {
+        logger.info(">>>>>> 启动任务 {} 开始 >>>>>>", task);
+        //校验任务key是否已经启动
+        String key = task.getKey();
+        if (this.isStart(key)) {
+            logger.info(">>>>>> 当前任务已经启动,无需重复启动!");
+        }
+        // 定时表达式
+        String taskCron = task.getCron();
+        // delegate
+        ScheduledTaskJob job = task.getJob();
+        //获取需要定时调度的接口
+        map.putIfAbsent(key, taskScheduler.schedule(job,
+                (triggerContext) -> new CronTrigger(taskCron).nextExecutionTime(triggerContext)
+        ));
+    }
+
+    /**
+     * 根据 key 停止任务
+     */
+    @Override
+    public void stop(String taskKey) {
+        logger.info(">>>>>> 进入停止任务 {}  >>>>>>", taskKey);
+        ScheduledFuture job = map.get(taskKey);
+        if (null != job) {
+            job.cancel(true);
+        }
+    }
+
+    /**
+     * 任务是否已经启动
+     */
+    private boolean isStart(String key) {
+        final ScheduledFuture job = map.get(key);
+        return null != job && job.isCancelled();
+    }
+
+}

+ 0 - 352
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TaskQuartzHandle.java

@@ -1,352 +0,0 @@
-package org.dbsyncer.listener.quartz;/*
-package org.dbsyncer.connector.quartz;
-
-import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.common.constant.ConnectorConstant;
-import org.dbsyncer.common.constant.MappingConstant;
-import org.dbsyncer.common.entity.DatabaseConfig;
-import org.dbsyncer.common.entity.Mapping;
-import org.dbsyncer.common.entity.MappingTask;
-import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.database.Database;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.sql.Timestamp;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-public final class TaskQuartzHandle {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    // 正在运行的job
-    private static Map<String, Boolean> running = new ConcurrentHashMap<String, Boolean>();
-
-    */
-/**
- * <ul>
- * <dt>即将面临的问题如下:</dt>
- * <dl>
- * <li>1.如果上一次任务未处理完成,而当前又产生了新的任务,此时多个任务可能会在同一时间执行相同工作,会造成重复数据.</li>
- * <li>2.如果上一次任务未处理完成,而当前又产生了新的任务,长时间下去,可能会造成任务堆积,影响系统性能.</li>
- * </dl>
- * </ul>
- * <ul>
- * <dt>相应的解决方法:</dt>
- * <dl>
- * <li>1.创建集合,存放执行的任务</li>
- * <li>2.从集合里检查上一次任务存在,继续执行.</li>
- * <li>3.从集合里检查上一次任务不存在,拒绝当前任务.</li>
- * </dl>
- * </ul>
- *
- * @param taskId
- * <p>
- * 组装数据格式
- * @param msg
- * @param eventType
- * @param beforeArr
- * @param afterArr
- * @throws JSONException
- * <p>
- * 解析操作事件,用","分割成map集合
- * @param event
- * @param filter
- * <p>
- * 根据增量策略过滤数据
- * @param inc  增量策略
- * @param rows 数据
- * @return 过滤数据
- * <p>
- * 刷新最后记录点
- * @param task
- * @param inc
- * @param scnPos
- * <p>
- * 组装数据格式
- * @param msg
- * @param eventType
- * @param beforeArr
- * @param afterArr
- * @throws JSONException
- * <p>
- * 解析操作事件,用","分割成map集合
- * @param event
- * @param filter
- * <p>
- * 根据增量策略过滤数据
- * @param inc  增量策略
- * @param rows 数据
- * @return 过滤数据
- * <p>
- * 刷新最后记录点
- * @param task
- * @param inc
- * @param scnPos
- *//*
-
-    public void handle(String taskId) {
-        // 1.检查是否存在
-        Boolean flg = running.get(taskId);
-        if (null != flg && flg) {
-            logger.error("the task ID \"" + taskId + "\" is running.");
-            return;
-        }
-        // 2.添加驱动job
-        running.put(taskId, true);
-        try {
-            // 3.获取驱动配置
-//            MappingTask task = data.getMapping(taskId);
-//            // 4.校验是否驱动配置为空
-//            if (null == task) {
-//                running.remove(taskId);
-//                logger.error("the task ID \"" + taskId + "\" can not be null.");
-//                return;
-//            }
-//            // 5.提交任务
-//            this.submit(task);
-        } catch (Exception e) {
-            logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-        } finally {
-            // 6.执行结束,移除正在执行的job
-            running.remove(taskId);
-        }
-    }
-
-    private void submit(MappingTask task) {
-        Mapping mapping = task.getSourceMapping();
-        Database connector = ConnectorFactory.getInstance().getConnector(mapping.getConnector(), Database.class);
-
-        // 1.建立连接
-        JdbcTemplate jdbcTemplate = null;
-        try {
-            DatabaseConfig config = (DatabaseConfig) task.getSourceMapping().getConfig();
-            jdbcTemplate = connector.getJdbcTemplate(config);
-            // 2.解析最新的数据
-            JSONArray msg = this.pull(jdbcTemplate, task);
-
-            // 3.将增量消息发送给manager处理
-            if (null != msg && 0 < msg.length()) {
-                JSONObject t = new JSONObject();
-                t.put("taskId", task.getId());
-                t.put("msg", msg);
-//                manager.handle(t);
-            }
-        } catch (Exception e) {
-            logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-        } finally {
-            if (null != connector) {
-                // 断开连接
-                connector.close(jdbcTemplate);
-            }
-        }
-    }
-
-    private JSONArray pull(JdbcTemplate jdbcTemplate, MappingTask task) {
-        // 1. 获取执行命令
-        Map<String, String> executeCommond = task.getSourceMapping().getExecuteCommond();
-
-        // 2. 获取最近记录点
-        // 获取最近时间 SELECT MAX(LASTDATE) FROM ASD_TEST;
-        String queryMax = executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ_MAX);
-        Timestamp scnPos = this.getScnPos(jdbcTemplate, queryMax);
-        // 没有扫描到数据直接返回
-        if (null == scnPos) {
-            return null;
-        }
-
-        // 3.读取最近记录点
-        Map<String, Map<String, String>> po = task.getPolicy();
-        Map<String, String> inc = po.get(MappingConstant.POLICY_INCREMENT);
-        String lastScnPosStr = inc.get("scnPos");
-        String sql = executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ);
-        Object[] args = null;
-        // 4.没有记录,证明是首次读取数据,查询>=lastScnPosStr的所有数据
-        if (StringUtils.isBlank(lastScnPosStr) || StringUtils.equals("null", StringUtils.trim(lastScnPosStr))) {
-            sql += executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ_ALL);
-            args = new Object[]{scnPos};
-        } else {
-            // 5. 防止时间戳格式不正确
-            Timestamp lastScnPos = null;
-            try {
-                lastScnPos = Timestamp.valueOf(lastScnPosStr);
-            } catch (Exception e1) {
-                // 如果时间戳格式不正确,则设置为空,下一次读取重新读取最新的记录点
-                this.refreshScnPos(task, inc, null);
-                return null;
-            }
-
-            // 6. 如果最近记录时间<=上次记录点,证明没有新增数据,直接结束.
-            if (null == lastScnPos || scnPos.getTime() <= lastScnPos.getTime()) {
-                return null;
-            }
-            sql += executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ_RANGE);
-            args = new Object[]{lastScnPos, scnPos};
-        }
-
-        // 7.刷新最后记录点
-        this.refreshScnPos(task, inc, scnPos.toString());
-
-        // 8.获取增量语句  SELECT ASD_TEST.* FROM ASD_TEST LASTDATE > ? AND LASTDATE <= ?
-        List<Map<String, Object>> rows = jdbcTemplate.queryForList(sql, args);
-        if (null == rows || rows.isEmpty()) {
-            return null;
-        }
-
-        // 9. 解析增量数据格式
-        return this.filterRowsByPolicyIncrement(inc, rows);
-    }
-
-    */
-/**
- * 组装数据格式
- *
- * @param msg
- * @param eventType
- * @param beforeArr
- * @param afterArr
- * @throws JSONException
- *//*
-
-    private void putRow(JSONArray msg, String eventType, JSONArray beforeArr, JSONArray afterArr) throws JSONException {
-        JSONObject row = new JSONObject();
-        row.put("eventType", eventType);
-        row.put("before", beforeArr);
-        row.put("after", afterArr);
-        msg.put(row);
-    }
-
-    */
-/**
- * 解析操作事件,用","分割成map集合
- *
- * @param event
- * @param filter
- *//*
-
-    private void splitEvent(String event, Map<String, Boolean> filter) {
-        if (StringUtils.isBlank(event)) {
-            return;
-        }
-        String[] arr = event.split(",");
-        if (null == arr) {
-            return;
-        }
-        int len = arr.length;
-        String e;
-        for (int i = 0; i < len; i++) {
-            e = arr[i];
-            filter.put(e, true);
-        }
-    }
-
-    */
-/**
- * 根据增量策略过滤数据
- *
- * @param inc  增量策略
- * @param rows 数据
- * @return 过滤数据
- *//*
-
-    private JSONArray filterRowsByPolicyIncrement(Map<String, String> inc, List<Map<String, Object>> rows) {
-        // 用于区分事件的字段名称
-        String eventFiled = inc.get("quartzEvent");
-        // 事件:新增、修改和删除
-        String quartzEventInsert = inc.get("quartzEventInsert");
-        String quartzEventUpdate = inc.get("quartzEventUpdate");
-        String quartzEventDelete = inc.get("quartzEventDelete");
-
-        // 由于新增、修改和删除过滤事件可能有多个组合
-        Map<String, Boolean> iFilter = new HashMap<String, Boolean>();
-        Map<String, Boolean> uFilter = new HashMap<String, Boolean>();
-        Map<String, Boolean> dFilter = new HashMap<String, Boolean>();
-        this.splitEvent(quartzEventInsert, iFilter);
-        this.splitEvent(quartzEventUpdate, uFilter);
-        this.splitEvent(quartzEventDelete, dFilter);
-
-        JSONArray msg = null;
-        JSONArray beforeArr = null;
-        JSONArray afterArr = null;
-        for (Map<String, Object> col : rows) {
-            // 创建返回过滤数据对象
-            if (null == msg) {
-                msg = new JSONArray();
-            }
-            try {
-                // 1.解析增量数据事件
-                String event = String.valueOf(col.get(eventFiled));
-                if (null != uFilter.get(event)) {
-                    // 1.1修改数据
-                    afterArr = this.parseColumn(col);
-                    // 组装数据格式
-                    this.putRow(msg, ConnectorConstant.OPERTION_UPDATE, new JSONArray(), afterArr);
-                } else if (null != iFilter.get(event)) {
-                    // 1.2新增数据
-                    afterArr = this.parseColumn(col);
-                    // 组装数据格式
-                    this.putRow(msg, ConnectorConstant.OPERTION_INSERT, new JSONArray(), afterArr);
-                } else if (null != dFilter.get(event)) {
-                    // 1.3删除数据
-                    beforeArr = this.parseColumn(col);
-                    // 组装数据格式
-                    this.putRow(msg, ConnectorConstant.OPERTION_DELETE, beforeArr, new JSONArray());
-                }
-            } catch (JSONException e) {
-                logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-            }
-        }
-
-        iFilter = null;
-        uFilter = null;
-        dFilter = null;
-        return msg;
-    }
-
-    */
-/**
- * 刷新最后记录点
- *
- * @param task
- * @param inc
- * @param scnPos
- *//*
-
-    private synchronized void refreshScnPos(MappingTask task, Map<String, String> inc, String scnPos) {
-        // 刷新最后记录点
-        inc.put("scnPos", scnPos);
-//        data.saveMapping(task.getId(), task);
-    }
-
-    // 转换行数据格式为JSONArray
-    private JSONArray parseColumn(Map<String, Object> col) throws JSONException {
-        JSONArray row = new JSONArray();
-        JSONObject attr = null;
-        Object value = null;
-        for (Entry<String, Object> obj : col.entrySet()) {
-            attr = new JSONObject();
-            attr.put("name", obj.getKey());
-            // 防止为null
-            value = obj.getValue();
-            value = null == value ? "" : value;
-            attr.put("value", value);
-            row.put(attr);
-        }
-        return row;
-    }
-
-    // 获取最近记录点
-    private Timestamp getScnPos(JdbcTemplate jdbcTemplate, String queryMax) {
-        return jdbcTemplate.queryForObject(queryMax, Timestamp.class);
-    }
-
-}
-*/

+ 14 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/config/TaskPoolConfig.java

@@ -2,7 +2,9 @@ package org.dbsyncer.web.config;
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -47,4 +49,16 @@ public class TaskPoolConfig {
         return executor;
     }
 
+    @Bean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+        taskScheduler.setPoolSize(5);
+        taskScheduler.setRemoveOnCancelPolicy(true);
+        taskScheduler.setThreadNamePrefix("taskScheduler");
+        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
+        taskScheduler.setAwaitTerminationSeconds(60);
+        return taskScheduler;
+    }
+
+
 }