Explorar el Código

Merge remote-tracking branch 'origin/V_1.0.0_Beta' into yjwang

yjwang hace 3 años
padre
commit
7a4892feb4
Se han modificado 22 ficheros con 370 adiciones y 255 borrados
  1. 8 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/ConfigService.java
  2. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java
  3. 11 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConfigServiceImpl.java
  4. 98 99
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/CacheConfig.java
  5. 37 9
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/ThreadPoolConfig.java
  6. 52 54
      dbsyncer-common/src/main/java/org/dbsyncer/common/snowflake/SnowflakeIdWorker.java
  7. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java
  8. 2 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java
  9. 14 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java
  10. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  11. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/AbstractTemplate.java
  12. 6 7
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  13. 5 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  14. 1 1
      dbsyncer-storage/src/main/resources/dbsyncer_data.sql
  15. 0 50
      dbsyncer-web/src/main/java/org/dbsyncer/web/config/ScheduleConfig.java
  16. 59 0
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/config/ConfigController.java
  17. 7 7
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/plugin/PluginController.java
  18. 5 10
      dbsyncer-web/src/main/resources/application.properties
  19. 43 0
      dbsyncer-web/src/main/resources/public/config/config.html
  20. 3 1
      dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html
  21. 9 4
      dbsyncer-web/src/main/resources/public/nav.html
  22. 6 6
      dbsyncer-web/src/main/resources/public/plugin/plugin.html

+ 8 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/ConfigService.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.biz;
 
 import org.dbsyncer.biz.vo.ConfigVo;
+import org.dbsyncer.parser.model.ConfigModel;
 
 import java.util.List;
 import java.util.Map;
@@ -40,4 +41,11 @@ public interface ConfigService {
      */
     List<ConfigVo> queryConfig();
 
+    /**
+     * 获取所有配置
+     *
+     * @return
+     */
+    List<ConfigModel> getConfigModelAll();
+
 }

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java

@@ -10,7 +10,7 @@ import org.dbsyncer.parser.model.AbstractConfigModel;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Convert;
 import org.dbsyncer.plugin.config.Plugin;
-import org.dbsyncer.storage.SnowflakeIdWorker;
+import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.json.JSONArray;
 import org.json.JSONException;

+ 11 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConfigServiceImpl.java

@@ -12,6 +12,7 @@ import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -66,6 +67,16 @@ public class ConfigServiceImpl implements ConfigService {
         return list;
     }
 
+    @Override
+    public List<ConfigModel> getConfigModelAll() {
+        List<ConfigModel> list = new ArrayList<>();
+        manager.getConfigAll().forEach(config -> list.add(config));
+        manager.getConnectorAll().forEach(config -> list.add(config));
+        manager.getMappingAll().forEach(config -> list.add(config));
+        manager.getMetaAll().forEach(config -> list.add(config));
+        return list;
+    }
+
     private ConfigVo convertConfig2Vo(Config config) {
         ConfigVo configVo = new ConfigVo();
         BeanUtils.copyProperties(config, configVo);

+ 98 - 99
dbsyncer-web/src/main/java/org/dbsyncer/web/config/CacheConfiguration.java → dbsyncer-common/src/main/java/org/dbsyncer/common/config/CacheConfig.java

@@ -1,100 +1,99 @@
-package org.dbsyncer.web.config;
-
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.Ticker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.cache.CacheManager;
-import org.springframework.cache.caffeine.CaffeineCache;
-import org.springframework.cache.interceptor.KeyGenerator;
-import org.springframework.cache.support.SimpleCacheManager;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/23 11:30
- */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.web")
-public class CacheConfiguration {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private Map<String, CacheConfig> cache;
-
-    @Bean
-    public KeyGenerator cacheKeyGenerator() {
-        return (target, method, params) -> {
-            String className = method.getDeclaringClass().getSimpleName();
-            String methodName = method.getName();
-            String paramHash = String.valueOf(Arrays.toString(params).hashCode());
-            String cacheKey = new StringJoiner("_").add(className).add(methodName).add(paramHash).toString();
-            logger.debug("generate cache key : {}", cacheKey);
-            return cacheKey;
-        };
-    }
-
-    @Bean
-    public Ticker ticker() {
-        return Ticker.systemTicker();
-    }
-
-    @Bean
-    public CacheManager cacheManager(Ticker ticker) {
-        SimpleCacheManager manager = new SimpleCacheManager();
-        if (cache != null) {
-            List<CaffeineCache> caches = cache.entrySet()
-                    .stream()
-                    .map(entry -> buildCache(entry.getKey(), entry.getValue(), ticker))
-                    .collect(Collectors.toList());
-            manager.setCaches(caches);
-        }
-        return manager;
-    }
-
-    private CaffeineCache buildCache(String key, CacheConfig config, Ticker ticker) {
-        logger.info("Cache key {} specified timeout of {} seconds, max of {}", key, config.getTimeout(), config.getMax());
-        final Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder()
-                .expireAfterWrite(config.getTimeout(), TimeUnit.SECONDS)
-                .maximumSize(config.getMax())
-                .ticker(ticker);
-        return new CaffeineCache(key, caffeineBuilder.build());
-    }
-
-    static class CacheConfig {
-        private Integer timeout;
-        private Integer max = 200;
-
-        public Integer getTimeout() {
-            return timeout;
-        }
-
-        public void setTimeout(Integer timeout) {
-            this.timeout = timeout;
-        }
-
-        public Integer getMax() {
-            return max;
-        }
-
-        public void setMax(Integer max) {
-            this.max = max;
-        }
-    }
-
-    public void setCache(Map<String, CacheConfig> cache) {
-        this.cache = cache;
-    }
-
+package org.dbsyncer.common.config;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Ticker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.caffeine.CaffeineCache;
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.cache.support.SimpleCacheManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/23 11:30
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.web")
+public class CacheConfig {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private Map<String, CacheProperties> cache;
+
+    @Bean
+    public KeyGenerator cacheKeyGenerator() {
+        return (target, method, params) -> {
+            String className = method.getDeclaringClass().getSimpleName();
+            String methodName = method.getName();
+            String paramHash = String.valueOf(Arrays.toString(params).hashCode());
+            String cacheKey = new StringJoiner("_").add(className).add(methodName).add(paramHash).toString();
+            logger.debug("generate cache key : {}", cacheKey);
+            return cacheKey;
+        };
+    }
+
+    @Bean
+    public Ticker ticker() {
+        return Ticker.systemTicker();
+    }
+
+    @Bean
+    public CacheManager cacheManager(Ticker ticker) {
+        SimpleCacheManager manager = new SimpleCacheManager();
+        if (cache != null) {
+            List<CaffeineCache> caches = cache.entrySet()
+                    .stream()
+                    .map(entry -> buildCache(entry.getKey(), entry.getValue(), ticker))
+                    .collect(Collectors.toList());
+            manager.setCaches(caches);
+        }
+        return manager;
+    }
+
+    private CaffeineCache buildCache(String key, CacheProperties config, Ticker ticker) {
+        logger.info("Cache key {} specified timeout of {} seconds, max of {}", key, config.getTimeout(), config.getMax());
+        final Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder()
+                .expireAfterWrite(config.getTimeout(), TimeUnit.SECONDS)
+                .maximumSize(config.getMax())
+                .ticker(ticker);
+        return new CaffeineCache(key, caffeineBuilder.build());
+    }
+
+    static class CacheProperties {
+        private Integer timeout;
+        private Integer max = 200;
+
+        public Integer getTimeout() {
+            return timeout;
+        }
+
+        public void setTimeout(Integer timeout) {
+            this.timeout = timeout;
+        }
+
+        public Integer getMax() {
+            return max;
+        }
+
+        public void setMax(Integer max) {
+            this.max = max;
+        }
+    }
+
+    public void setCache(Map<String, CacheProperties> cache) {
+        this.cache = cache;
+    }
+
 }

+ 37 - 9
dbsyncer-web/src/main/java/org/dbsyncer/web/config/ThreadPoolConfig.java → dbsyncer-common/src/main/java/org/dbsyncer/common/config/ThreadPoolConfig.java

@@ -1,6 +1,6 @@
-package org.dbsyncer.web.config;
+package org.dbsyncer.common.config;
 
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -14,19 +14,23 @@ import java.util.concurrent.RejectedExecutionHandler;
  * @date 2020-04-26 23:40
  */
 @Configuration
+@ConfigurationProperties(prefix = "dbsyncer.web.thread.pool")
 public class ThreadPoolConfig {
 
     /**
-     * 工作线程池队列容量
+     * 工作线程
      */
-    @Value(value = "${dbsyncer.web.thread.pool.queue.capacity}")
-    private int queueCapacity;
+    private int coreSize = Runtime.getRuntime().availableProcessors() * 2;
 
     /**
-     * 工作线程数
+     * 最大工作线程数
+     */
+    private int maxSize = 64;
+
+    /**
+     * 工作线任务队列
      */
-    @Value(value = "${dbsyncer.web.thread.pool.core.size}")
-    private int coreSize;
+    private int queueCapacity = 1000;
 
     @Bean("taskExecutor")
     public Executor taskExecutor() {
@@ -37,7 +41,7 @@ public class ThreadPoolConfig {
         executor.setCorePoolSize(coreSize);
         //最大线程数128:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
         //maxPoolSize 当系统负载大道最大值时,核心线程数已无法按时处理完所有任务,这是就需要增加线程.每秒200个任务需要20个线程,那么当每秒1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60;
-        executor.setMaxPoolSize(128);
+        executor.setMaxPoolSize(maxSize);
         //缓冲队列:用来缓冲执行任务的队列
         executor.setQueueCapacity(queueCapacity);
         //允许线程的空闲时间30秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
@@ -70,4 +74,28 @@ public class ThreadPoolConfig {
         };
     }
 
+    public int getQueueCapacity() {
+        return queueCapacity;
+    }
+
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
+    public int getCoreSize() {
+        return coreSize;
+    }
+
+    public void setCoreSize(int coreSize) {
+        this.coreSize = coreSize;
+    }
+
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+    public void setMaxSize(int maxSize) {
+        this.maxSize = maxSize;
+    }
+
 }

+ 52 - 54
dbsyncer-storage/src/main/java/org/dbsyncer/storage/SnowflakeIdWorker.java → dbsyncer-common/src/main/java/org/dbsyncer/common/snowflake/SnowflakeIdWorker.java

@@ -1,36 +1,49 @@
-package org.dbsyncer.storage;
+package org.dbsyncer.common.snowflake;
 
-import org.springframework.beans.factory.annotation.Value;
+import org.dbsyncer.common.CommonException;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
 import java.time.Instant;
 
 @Component
+@ConfigurationProperties(prefix = "dbsyncer.common.worker")
 public class SnowflakeIdWorker {
+
     /**
-     * 开始时间截 (2015-01-01)
+     * 工作机器ID(0~31)
      */
-    private final long twepoch = 1420041600000L;
+    private long id = 1L;
 
     /**
-     * 机器id所占的位
+     * 数据中心ID(0~31)
      */
-    private final long workerIdBits = 5L;
+    private long dataCenterId = 1L;
 
     /**
-     * 数据标识id所占的位数
+     * 毫秒内序列(0~4095)
+     */
+    private long sequence = 0L;
+
+    /**
+     * 上次生成ID的时间截
+     */
+    private long lastTimestamp = -1L;
+
+    /**
+     * 开始时间截 (2015-01-01)
      */
-    private final long datacenterIdBits = 5L;
+    private final long twepoch = 1420041600000L;
 
     /**
-     * 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
+     * 机器id所占的位数
      */
-    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
+    private final long workerIdBits = 5L;
 
     /**
-     * 支持的最大数据标识id,结果是31
+     * 数据标识id所占的位数
      */
-    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
+    private final long dataCenterIdBits = 5L;
 
     /**
      * 序列在id中占的位数
@@ -45,44 +58,23 @@ public class SnowflakeIdWorker {
     /**
      * 数据标识id向左移17位(12+5)
      */
-    private final long datacenterIdShift = sequenceBits + workerIdBits;
+    private final long dataCenterIdShift = sequenceBits + workerIdBits;
 
     /**
      * 时间截向左移22位(5+5+12)
      */
-    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
+    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
 
     /**
      * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
      */
     private final long sequenceMask = -1L ^ (-1L << sequenceBits);
 
-    /**
-     * 工作机器ID(0~31)
-     */
-    @Value(value = "${dbsyncer.storage.id}")
-    private long workerId;
-
-    /**
-     * 数据中心ID(0~31)
-     */
-    private long datacenterId;
-
-    /**
-     * 毫秒内序列(0~4095)
-     */
-    private long sequence = 0L;
-
-    /**
-     * 上次生成ID的时间截
-     */
-    private long lastTimestamp = -1L;
-
     public SnowflakeId revert(Long id) {
         long workerId = id >> workerIdShift & ~(-1L << workerIdBits);
-        long datacenterId = id >> datacenterIdShift & ~(-1L << datacenterIdBits);
+        long dataCenterId = id >> dataCenterIdShift & ~(-1L << dataCenterIdBits);
         long timestamp = new Long(id >> timestampLeftShift) + twepoch;
-        return new SnowflakeId(workerId, datacenterId, timestamp);
+        return new SnowflakeId(workerId, dataCenterId, timestamp);
     }
 
     /**
@@ -95,8 +87,7 @@ public class SnowflakeIdWorker {
 
         //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
         if (timestamp < lastTimestamp) {
-            throw new StorageException(
-                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
+            throw new CommonException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
         }
 
         //如果是同一时间生成的,则进行毫秒内序列
@@ -118,8 +109,8 @@ public class SnowflakeIdWorker {
 
         //移位并通过或运算拼到一起组成64位的ID
         return ((timestamp - twepoch) << timestampLeftShift) //
-                | (datacenterId << datacenterIdShift) //
-                | (workerId << workerIdShift) //
+                | (dataCenterId << dataCenterIdShift) //
+                | (id << workerIdShift) //
                 | sequence;
     }
 
@@ -146,23 +137,30 @@ public class SnowflakeIdWorker {
         return Instant.now().toEpochMilli();
     }
 
-//    public static void main(String[] args) {
-//        SnowflakeIdWorker idWorker = new SnowflakeIdWorker();
-//        for (int i = 0; i < 1000; i++) {
-//            long id = idWorker.nextId();
-//            System.out.println(Long.toBinaryString(id));
-//            System.out.println(id);
-//        }
-//    }
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDataCenterId() {
+        return dataCenterId;
+    }
+
+    public void setDataCenterId(long dataCenterId) {
+        this.dataCenterId = dataCenterId;
+    }
 
     private class SnowflakeId {
         private long workerId;
-        private long datacenterId;
+        private long dataCenterId;
         private long timestamp;
 
-        public SnowflakeId(long workerId, long datacenterId, long timestamp) {
+        public SnowflakeId(long workerId, long dataCenterId, long timestamp) {
             this.workerId = workerId;
-            this.datacenterId = datacenterId;
+            this.dataCenterId = dataCenterId;
             this.timestamp = timestamp;
         }
 
@@ -170,8 +168,8 @@ public class SnowflakeIdWorker {
             return workerId;
         }
 
-        public long getDatacenterId() {
-            return datacenterId;
+        public long getDataCenterId() {
+            return dataCenterId;
         }
 
         public long getTimestamp() {

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java

@@ -14,7 +14,7 @@ import java.util.logging.Logger;
 
 public class SimpleDataSource implements DataSource, AutoCloseable {
 
-    private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(2000);
+    private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(500);
     private long lifeTime = 60 * 1000;
     private String driverClassName;
     private String url;

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -209,6 +209,8 @@ public class FileExtractor extends AbstractExtractor {
                     for (WatchEvent<?> event : watchEvents) {
                         parseEvent(event.context().toString());
                     }
+                } catch (ClosedWatchServiceException e) {
+                    break;
                 } catch (Exception e) {
                     logger.error(e.getMessage());
                 } finally {

+ 14 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -23,6 +23,8 @@ import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * 授予登录账号监听事件权限
@@ -56,6 +58,8 @@ public class DBChangeNotification {
     private Set<String> filterTable;
     private List<RowEventListener> listeners = new ArrayList<>();
     private BlockingQueue<DCNEvent> queue = new LinkedBlockingQueue<>(100);
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
 
     public DBChangeNotification(String username, String password, String url) {
         this.username = username;
@@ -69,7 +73,13 @@ public class DBChangeNotification {
 
     public void start() throws SQLException {
         try {
+            connectLock.lock();
+            if (connected) {
+                logger.error("DBChangeNotification is already started");
+                return;
+            }
             conn = connect();
+            connected = true;
             statement = (OracleStatement) conn.createStatement();
             readTables();
 
@@ -112,6 +122,8 @@ public class DBChangeNotification {
             // to interrupt the thread otherwise it will be hanging around.
             close();
             throw ex;
+        } finally {
+            connectLock.unlock();
         }
     }
 
@@ -124,6 +136,7 @@ public class DBChangeNotification {
     }
 
     public void close() {
+        connected = false;
         if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
@@ -323,7 +336,7 @@ public class DBChangeNotification {
 
         @Override
         public void run() {
-            while (!isInterrupted()) {
+            while (!isInterrupted() && connected) {
                 try {
                     // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
                     DCNEvent event = queue.take();

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -82,7 +82,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
     @PostConstruct
     private void init() {
-        scheduledTaskService.start(10000, this);
+        scheduledTaskService.start(3000, this);
     }
 
     @Override

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/AbstractTemplate.java

@@ -23,4 +23,4 @@ public abstract class AbstractTemplate {
         return null != strategy ? strategy : GroupStrategyEnum.DEFAULT;
     }
 
-}
+}

+ 6 - 7
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.monitor;
 
+import org.dbsyncer.common.config.ThreadPoolConfig;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -17,7 +18,7 @@ import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.query.Query;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
@@ -40,14 +41,12 @@ public class MonitorFactory implements Monitor {
     @Autowired
     private Manager manager;
 
+    @Qualifier("taskExecutor")
     @Autowired
     private Executor taskExecutor;
 
-    /**
-     * 工作线程池队列容量
-     */
-    @Value(value = "${dbsyncer.web.thread.pool.queue.capacity}")
-    private int queueCapacity;
+    @Autowired
+    private ThreadPoolConfig threadPoolConfig;
 
     @Override
     public Mapping getMapping(String mappingId) {
@@ -134,7 +133,7 @@ public class MonitorFactory implements Monitor {
         ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
         BlockingQueue<Runnable> queue = pool.getQueue();
         report.setQueueUp(queue.size());
-        report.setQueueCapacity(queueCapacity);
+        report.setQueueCapacity(threadPoolConfig.getQueueCapacity());
         return report;
     }
 

+ 5 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -2,10 +2,11 @@ package org.dbsyncer.parser.flush.impl;
 
 import com.alibaba.fastjson.JSONException;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.flush.model.StorageRequest;
-import org.dbsyncer.storage.SnowflakeIdWorker;
+import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
@@ -35,6 +36,8 @@ public class FlushServiceImpl implements FlushService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private static final int MAX_ERROR_LENGTH = 1000;
+
     @Autowired
     private StorageService storageService;
 
@@ -62,7 +65,7 @@ public class FlushServiceImpl implements FlushService {
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
             params.put(ConfigConstant.DATA_EVENT, event);
-            params.put(ConfigConstant.DATA_ERROR, error);
+            params.put(ConfigConstant.DATA_ERROR, StringUtil.substring(error, 0, MAX_ERROR_LENGTH));
             try {
                 params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
             } catch (JSONException e) {

+ 1 - 1
dbsyncer-storage/src/main/resources/dbsyncer_data.sql

@@ -1,7 +1,7 @@
 CREATE TABLE `dbsyncer_data` (
   `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
   `SUCCESS` int(1) NOT NULL COMMENT '成功1/失败0',
-  `EVENT` varchar(20) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '事件',
+  `EVENT` varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '事件',
   `ERROR` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL COMMENT '异常信息',
   `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',
   `JSON` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '同步数据',

+ 0 - 50
dbsyncer-web/src/main/java/org/dbsyncer/web/config/ScheduleConfig.java

@@ -1,50 +0,0 @@
-package org.dbsyncer.web.config;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.annotation.SchedulingConfigurer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.scheduling.config.ScheduledTaskRegistrar;
-
-import java.util.concurrent.RejectedExecutionHandler;
-
-/**
- * @author yjwang
- * @date 2022/4/29 10:27
- */
-@Configuration
-public class ScheduleConfig implements SchedulingConfigurer {
-
-    @Override
-    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
-        scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
-    }
-
-    @Bean(name = "taskScheduler", destroyMethod = "shutdown")
-    public ThreadPoolTaskScheduler taskScheduler() {
-        int poolSize = Runtime.getRuntime().availableProcessors() * 2;
-        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
-        //核心线程池大小
-        scheduler.setPoolSize(poolSize);
-        //线程名字前缀
-        scheduler.setThreadNamePrefix("taskScheduler-");
-        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
-        scheduler.setWaitForTasksToCompleteOnShutdown(true);
-        //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
-        scheduler.setAwaitTerminationSeconds(60);
-        // 线程池满,拒绝策略
-        scheduler.setRejectedExecutionHandler(rejectedExecutionHandler());
-
-        return scheduler;
-    }
-
-    public RejectedExecutionHandler rejectedExecutionHandler() {
-        return (r, executor) -> {
-            try {
-                executor.getQueue().put(r);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        };
-    }
-}

+ 59 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/config/ConfigController.java

@@ -0,0 +1,59 @@
+package org.dbsyncer.web.controller.config;
+
+import org.dbsyncer.biz.ConfigService;
+import org.dbsyncer.biz.vo.RestResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Controller;
+import org.springframework.ui.ModelMap;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import javax.servlet.http.HttpServletResponse;
+
+@Controller
+@RequestMapping("/config")
+public class ConfigController {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private ConfigService configService;
+
+    @RequestMapping("")
+    public String index(ModelMap model) {
+        model.put("config", configService.getConfigModelAll());
+        return "config/config";
+    }
+
+    @PostMapping(value = "/getAll")
+    @ResponseBody
+    public RestResult getAll() {
+        try {
+            return RestResult.restSuccess("ok");
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e.getClass());
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
+    @PostMapping(value = "/upload")
+    @ResponseBody
+    public RestResult upload() {
+        try {
+            return RestResult.restSuccess("ok");
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e.getClass());
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
+    @GetMapping("/download")
+    public void download(HttpServletResponse response) {
+
+    }
+
+}

+ 7 - 7
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/upload/UploadController.java → dbsyncer-web/src/main/java/org/dbsyncer/web/controller/plugin/PluginController.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.web.controller.upload;
+package org.dbsyncer.web.controller.plugin;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -21,11 +21,14 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.*;
 
 @Controller
-@RequestMapping("/upload")
-public class UploadController {
+@RequestMapping("/plugin")
+public class PluginController {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    @Autowired
+    private PluginService pluginService;
+
     /**
      * 版本号
      */
@@ -36,12 +39,9 @@ public class UploadController {
     public String index(ModelMap model) {
         model.put("plugins", pluginService.getPluginAll());
         model.put("version", version);
-        return "upload/upload";
+        return "plugin/plugin";
     }
 
-    @Autowired
-    private PluginService pluginService;
-
     @PostMapping(value = "/upload")
     @ResponseBody
     public RestResult upload(MultipartFile[] files) {

+ 5 - 10
dbsyncer-web/src/main/resources/application.properties

@@ -1,24 +1,19 @@
 server.ip=127.0.0.1
 server.port=18686
-
 #web
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-dbsyncer.web.thread.pool.core.size=10
-dbsyncer.web.thread.pool.queue.capacity=2000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/
-
-#storage
-dbsyncer.storage.id=1
+#dbsyncer.common.worker.id=1
+#dbsyncer.web.thread.pool.coreSize=8
+#dbsyncer.web.thread.pool.maxSize=64
+#dbsyncer.web.thread.pool.queueCapacity=1000
 #dbsyncer.storage.support.mysql.enabled=true
 #dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.password=123
-
-#parser
 #dbsyncer.parser.flush.full.enabled=true
-
 #monitor
 management.endpoints.web.base-path=/app
 management.endpoints.web.exposure.include=*
@@ -26,7 +21,7 @@ management.endpoint.health.show-details=always
 management.health.elasticsearch.enabled=false
 info.app.name=DBSyncer
 info.app.version=1.1.8-Beta
-info.app.copyright=&copy;2021 ${info.app.name}(${info.app.version})<footer>Designed By <a href='https://gitee.com/ghi/dbsyncer' target='_blank' >AE86</a></footer>
+info.app.copyright=&copy;2022 ${info.app.name}(${info.app.version})<footer>Designed By <a href='https://gitee.com/ghi/dbsyncer' target='_blank' >AE86</a></footer>
 
 #All < Trace < Debug < Info < Warn < Error < Fatal < OFF
 logging.level.root=info

+ 43 - 0
dbsyncer-web/src/main/resources/public/config/config.html

@@ -0,0 +1,43 @@
+<!DOCTYPE html>
+<html xmlns="http://www.w3.org/1999/xhtml"
+      xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
+
+<div class="container">
+    <form id="configEditForm" class="form-horizontal" role="form">
+        <div class="row text-center">
+            <div class="page-header">
+                <h3>配置管理</h3>
+            </div>
+        </div>
+
+        <!-- 操作 -->
+        <div class="row">
+            <div class="col-md-12">
+                <table class="table table-hover">
+                    <caption>配置列表([[${config?.size()} ?: 0]])</caption>
+                    <thead>
+                    <tr>
+                        <th>ID</th>
+                        <th>类型</th>
+                        <th>名称</th>
+                        <th>创建时间</th>
+                        <th>修改时间</th>
+                    </tr>
+                    </thead>
+                    <tbody id="pluginList">
+                    <tr th:id="${c?.name}" th:each="c,state : ${config}">
+                        <td th:text="${c?.id}"/>
+                        <td th:text="${c?.type}"/>
+                        <td th:text="${c?.name}"/>
+                        <td th:text="${#dates.format(c?.createTime, 'yyyy-MM-dd HH:mm:ss')}"/>
+                        <td th:text="${#dates.format(c?.updateTime, 'yyyy-MM-dd HH:mm:ss')}"/>
+                    </tr>
+                    </tbody>
+                </table>
+            </div>
+        </div>
+    </form>
+</div>
+
+<script th:src="@{/js/pwd/index.js}"></script>
+</html>

+ 3 - 1
dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html

@@ -31,7 +31,9 @@
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
-            <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="128" dbsyncer-valid="require" rows="5" th:text="${connector?.config?.url}?:'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
+            <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024"
+                      dbsyncer-valid="require" rows="5"
+                      th:text="${connector?.config?.url}?:'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
         </div>
     </div>
     <div class="form-group">

+ 9 - 4
dbsyncer-web/src/main/resources/public/nav.html

@@ -9,14 +9,19 @@
         </div>
         <div>
             <ul id="menu" class="nav navbar-nav">
-                <li class="active"><a href="javascript:void(0);" url="/index"><span class="fa fa-tachometer"></span>驱动管理</a></li>
-                <li><a href="javascript:void(0);" url="/monitor" ><span class="fa fa-line-chart"></span>监控</a></li>
-                <li><a href="javascript:void(0);" url="/upload" ><span class="fa fa-cloud-upload" aria-hidden="true"></span>上传插件</a></li>
+                <li class="active"><a href="javascript:void(0);" url="/index"><span
+                        class="fa fa-tachometer"></span>驱动</a>
+                </li>
+                <li><a href="javascript:void(0);" url="/monitor"><span class="fa fa-line-chart"></span>监控</a></li>
+                <li><a href="javascript:void(0);" url="/plugin"><span class="fa fa-puzzle-piece"
+                                                                      aria-hidden="true"></span>插件</a></li>
                 <li class="dropdown">
-                    <a href="javascript:void(0);" class="dropdown-toggle" data-toggle="dropdown">参数<b class="caret"></b></a>
+                    <a href="javascript:void(0);" class="dropdown-toggle" data-toggle="dropdown">配置<b class="caret"></b></a>
                     <ul class="dropdown-menu">
                         <li><a href="javascript:void(0);" url="/system"><span class="fa fa-cog"></span>系统参数</a></li>
                         <li><a href="javascript:void(0);" url="/pwd"><span class="fa fa-lock"></span>修改密码</a></li>
+                        <li><a href="javascript:void(0);" url="/config"><span class="fa fa-file"
+                                                                              aria-hidden="true"></span>配置管理</a></li>
                     </ul>
                 </li>
             </ul>

+ 6 - 6
dbsyncer-web/src/main/resources/public/upload/upload.html → dbsyncer-web/src/main/resources/public/plugin/plugin.html

@@ -136,24 +136,24 @@ public class MyPlugin implements ConvertService{
     $("#filePlugin").fileinput({
         theme: 'fas',
         language: 'zh',
-        uploadUrl: $basePath + '/upload/upload',
+        uploadUrl: $basePath + '/plugin/plugin',
         enctype: 'multipart/form-data',
-        removeFromPreviewOnError:true, //当选择的文件不符合规则时,例如不是指定后缀文件、大小超出配置等,选择的文件不会出现在预览框中,只会显示错误信息
+        removeFromPreviewOnError: true, //当选择的文件不符合规则时,例如不是指定后缀文件、大小超出配置等,选择的文件不会出现在预览框中,只会显示错误信息
         allowedFileExtensions: ['jar'],
         minFileCount: 0, //每次多次上载允许的最小文件数。如果设置为0,则表示文件数是可选的
         maxFileCount: 5, //表示允许同时上传的最大文件个数 如果设置为0,则表示允许的文件数不受限制
         showPreview: true,
-        showUpload:true,//不展示上传按钮
-        validateInitialCount:true,//是否在验证minFileCount和包含初始预览文件计数(服务器上载文件)maxFileCount
+        showUpload: true,//不展示上传按钮
+        validateInitialCount: true,//是否在验证minFileCount和包含初始预览文件计数(服务器上载文件)maxFileCount
     }).on("fileuploaded", function(event, data, previewId, index) {
         if (!data.response.success) {
             bootGrowl(data.response.resultValue, "danger");
         }
-        doLoader("/upload");
+        doLoader("/plugin");
     });
 
     function downLoad(){
-        window.open($basePath + "/upload/download");
+        window.open($basePath + "/plugin/download");
     }
 </script>
 </html>