AE86 5 éve
szülő
commit
30baea02be

+ 16 - 21
dbsyncer-common/src/main/java/org/dbsyncer/common/task/Task.java

@@ -2,34 +2,29 @@ package org.dbsyncer.common.task;
 
 public class Task {
 
-    private TaskCallBack taskCallBack;
+    private StateEnum state;
 
-    // 0: 停止;1: 运行
-    private int state;
-    public static final int STOP = 0;
-    public static final int RUNNING = 1;
-
-    public TaskCallBack getTaskCallBack() {
-        return taskCallBack;
-    }
-
-    public void setTaskCallBack(TaskCallBack taskCallBack) {
-        this.taskCallBack = taskCallBack;
+    public Task() {
+        this.state = StateEnum.RUNNING;
     }
 
-    public int getState() {
-        return state;
-    }
-
-    public void setState(int state) {
-        this.state = state;
+    public void stop() {
+        this.state = StateEnum.STOP;
     }
 
     public boolean isRunning() {
-        return state == RUNNING;
+        return StateEnum.RUNNING == state;
     }
 
-    public void close() {
-        taskCallBack.cancel();
+    public enum StateEnum{
+        /**
+         * 运行
+         */
+        RUNNING,
+        /**
+         * 停止
+         */
+        STOP;
     }
+
 }

+ 0 - 10
dbsyncer-common/src/main/java/org/dbsyncer/common/task/TaskCallBack.java

@@ -1,10 +0,0 @@
-package org.dbsyncer.common.task;
-
-public interface TaskCallBack {
-
-    /**
-     * 关闭任务
-     */
-    void cancel();
-
-}

+ 11 - 10
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -249,7 +249,17 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
         String metaId = mapping.getMetaId();
         changeMetaState(metaId, MetaEnum.STOPPING);
 
-        extractor.asyncClose(metaId);
+        extractor.close(metaId);
+    }
+
+    @Override
+    public void changeMetaState(String metaId, MetaEnum metaEnum){
+        Meta meta = getMeta(metaId);
+        int code = metaEnum.getCode();
+        if(meta.getState() != code){
+            meta.setState(code);
+            editMeta(meta);
+        }
     }
 
     @Override
@@ -270,13 +280,4 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
         return extractor;
     }
 
-    private void changeMetaState(String metaId, MetaEnum metaEnum){
-        Meta meta = getMeta(metaId);
-        int code = metaEnum.getCode();
-        if(meta.getState() != code){
-            meta.setState(code);
-            editMeta(meta);
-        }
-    }
-
 }

+ 9 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/TaskExecutor.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.manager;
 
+import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.model.Mapping;
 
 /**
@@ -25,4 +26,12 @@ public interface TaskExecutor {
      */
     void close(Mapping mapping);
 
+    /**
+     * 切换meta状态
+     *
+     * @param metaId
+     * @param metaEnum
+     */
+    void changeMetaState(String metaId, MetaEnum metaEnum);
+
 }

+ 16 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/AbstractExtractor.java

@@ -1,5 +1,21 @@
 package org.dbsyncer.manager.extractor;
 
+import org.dbsyncer.common.event.ClosedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+
 public abstract class AbstractExtractor implements Extractor {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    protected void publishClosedEvent(String metaId) {
+        applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
+        logger.info("结束任务:{}", metaId);
+    }
+
 }

+ 1 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/Extractor.java

@@ -8,7 +8,6 @@ public interface Extractor {
     @Async("taskExecutor")
     void asyncStart(Mapping mapping);
 
-    @Async("taskExecutor")
-    void asyncClose(String metaId);
+    void close(String metaId);
 
 }

+ 20 - 30
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/FullExtractor.java

@@ -36,55 +36,45 @@ public class FullExtractor extends AbstractExtractor {
     @Autowired
     private Manager manager;
 
-    @Autowired
-    private ApplicationContext applicationContext;
-
     protected Map<String, Task> map = new ConcurrentHashMap<>();
 
     @Override
     public void asyncStart(Mapping mapping) {
         String metaId = mapping.getMetaId();
-        logger.info("启动任务:{}", metaId);
+        map.putIfAbsent(metaId, new Task());
+        Task task = map.get(metaId);
+
         // TODO 获取数据源连接器
         Connector connector = manager.getConnector(mapping.getSourceConnectorId());
         Assert.notNull(connector, "数据源配置不能为空.");
 
-        Task task = new Task();
-        task.setState(Task.RUNNING);
-        task.setTaskCallBack(() -> publishClosedEvent(metaId));
-        map.putIfAbsent(metaId, task);
-
-        run(task);
+        try {
+            logger.info("启动任务:{}", metaId);
+            run(task);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            map.remove(metaId);
+            publishClosedEvent(metaId);
+        }
     }
 
     @Override
-    public void asyncClose(String metaId) {
+    public void close(String metaId) {
         Task task = map.get(metaId);
         if (null != task) {
-            task.setState(Task.STOP);
-            logger.info("关闭中...");
+            task.stop();
         }
     }
 
-    protected void run(Task task) {
-        for(;;){
-            if(task.isRunning()){
-                try {
-                    logger.info("模拟同步休眠5s");
-                    TimeUnit.SECONDS.sleep(5);
-                } catch (InterruptedException e) {
-                    logger.error(e.getMessage());
-                }
-                continue;
+    protected void run(Task task) throws InterruptedException {
+        for (; ; ) {
+            if (!task.isRunning()) {
+                break;
             }
-            task.close();
-            break;
+            logger.info("模拟同步休眠5s");
+            TimeUnit.SECONDS.sleep(5);
         }
     }
 
-    protected void publishClosedEvent(String metaId) {
-        applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
-        logger.info("结束任务:{}", metaId);
-    }
-
 }

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/IncrementExtractor.java

@@ -24,7 +24,7 @@ public class IncrementExtractor extends AbstractExtractor {
     }
 
     @Override
-    public void asyncClose(String metaId) {
+    public void close(String metaId) {
 
     }
 }

+ 4 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/PreloadTemplate.java

@@ -89,9 +89,12 @@ public final class PreloadTemplate extends AbstractTemplate implements Applicati
         List<Meta> metas = operationTemplate.queryAll(queryConfig);
         if (!CollectionUtils.isEmpty(metas)) {
             metas.forEach(m -> {
-                if (MetaEnum.isRunning(m.getState())) {
+                // 恢复驱动状态
+                if (MetaEnum.RUNNING.getCode() == m.getState()) {
                     Mapping mapping = manager.getMapping(m.getMappingId());
                     manager.start(mapping);
+                }else if(MetaEnum.STOPPING.getCode() == m.getState()){
+                    manager.changeMetaState(m.getId(), MetaEnum.READY);
                 }
             });
         }

+ 3 - 3
dbsyncer-web/src/main/java/org/dbsyncer/web/config/WebAppConfig.java

@@ -190,13 +190,13 @@ public class WebAppConfig extends WebSecurityConfigurerAdapter implements Authen
 
     @Override
     public void sessionCreated(HttpSessionEvent se) {
-        logger.info("创建会话:{}", se.getSession().getId());
+        logger.debug("创建会话:{}", se.getSession().getId());
         int maxInactiveInterval = se.getSession().getMaxInactiveInterval();
-        logger.info(String.valueOf(maxInactiveInterval));
+        logger.debug(String.valueOf(maxInactiveInterval));
     }
 
     @Override
     public void sessionDestroyed(HttpSessionEvent se) {
-        logger.info("销毁会话:{}", se.getSession().getId());
+        logger.debug("销毁会话:{}", se.getSession().getId());
     }
 }

+ 22 - 2
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/TestController.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.web.controller;
 
+import org.dbsyncer.biz.MappingService;
 import org.dbsyncer.web.remote.UserService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -12,6 +13,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
 import javax.servlet.http.HttpServletRequest;
+import java.util.concurrent.CyclicBarrier;
 
 @Controller
 @RequestMapping("/test")
@@ -22,6 +24,9 @@ public class TestController {
     @Autowired
     private UserService userService;
 
+    @Autowired
+    private MappingService mappingService;
+
     @GetMapping("")
     public String index(HttpServletRequest request, ModelMap model) {
         return "test.html";
@@ -32,8 +37,23 @@ public class TestController {
     public Object demo(Model model) {
         logger.info("demo");
 
-        //String res = userService.hello("我是master");
-        //logger.info("slave响应:{}", res);
+        int size = 10;
+        CyclicBarrier barrier = new CyclicBarrier(size);
+        for (int i = 0; i < size; i++) {
+            new Thread(()->{
+                try {
+                    logger.info("线程{}准备就绪", Thread.currentThread().getName());
+                    barrier.await();
+                    logger.info("线程{}执行中", Thread.currentThread().getName());
+                    String start = mappingService.start("704107393226641408");
+                    logger.info(start);
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                }
+            }).start();
+        }
+
+
         return "hello";
     }