Selaa lähdekoodia

!62 merge
Merge pull request !62 from AE86/V_1.0.0_Beta

AE86 3 vuotta sitten
vanhempi
säilyke
86b9ffe41d

+ 98 - 99
dbsyncer-web/src/main/java/org/dbsyncer/web/config/CacheConfiguration.java → dbsyncer-common/src/main/java/org/dbsyncer/common/config/CacheConfig.java

@@ -1,100 +1,99 @@
-package org.dbsyncer.web.config;
-
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.Ticker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.cache.CacheManager;
-import org.springframework.cache.caffeine.CaffeineCache;
-import org.springframework.cache.interceptor.KeyGenerator;
-import org.springframework.cache.support.SimpleCacheManager;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/23 11:30
- */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.web")
-public class CacheConfiguration {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private Map<String, CacheConfig> cache;
-
-    @Bean
-    public KeyGenerator cacheKeyGenerator() {
-        return (target, method, params) -> {
-            String className = method.getDeclaringClass().getSimpleName();
-            String methodName = method.getName();
-            String paramHash = String.valueOf(Arrays.toString(params).hashCode());
-            String cacheKey = new StringJoiner("_").add(className).add(methodName).add(paramHash).toString();
-            logger.debug("generate cache key : {}", cacheKey);
-            return cacheKey;
-        };
-    }
-
-    @Bean
-    public Ticker ticker() {
-        return Ticker.systemTicker();
-    }
-
-    @Bean
-    public CacheManager cacheManager(Ticker ticker) {
-        SimpleCacheManager manager = new SimpleCacheManager();
-        if (cache != null) {
-            List<CaffeineCache> caches = cache.entrySet()
-                    .stream()
-                    .map(entry -> buildCache(entry.getKey(), entry.getValue(), ticker))
-                    .collect(Collectors.toList());
-            manager.setCaches(caches);
-        }
-        return manager;
-    }
-
-    private CaffeineCache buildCache(String key, CacheConfig config, Ticker ticker) {
-        logger.info("Cache key {} specified timeout of {} seconds, max of {}", key, config.getTimeout(), config.getMax());
-        final Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder()
-                .expireAfterWrite(config.getTimeout(), TimeUnit.SECONDS)
-                .maximumSize(config.getMax())
-                .ticker(ticker);
-        return new CaffeineCache(key, caffeineBuilder.build());
-    }
-
-    static class CacheConfig {
-        private Integer timeout;
-        private Integer max = 200;
-
-        public Integer getTimeout() {
-            return timeout;
-        }
-
-        public void setTimeout(Integer timeout) {
-            this.timeout = timeout;
-        }
-
-        public Integer getMax() {
-            return max;
-        }
-
-        public void setMax(Integer max) {
-            this.max = max;
-        }
-    }
-
-    public void setCache(Map<String, CacheConfig> cache) {
-        this.cache = cache;
-    }
-
+package org.dbsyncer.common.config;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Ticker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.caffeine.CaffeineCache;
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.cache.support.SimpleCacheManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/23 11:30
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.web")
+public class CacheConfig {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private Map<String, CacheProperties> cache;
+
+    @Bean
+    public KeyGenerator cacheKeyGenerator() {
+        return (target, method, params) -> {
+            String className = method.getDeclaringClass().getSimpleName();
+            String methodName = method.getName();
+            String paramHash = String.valueOf(Arrays.toString(params).hashCode());
+            String cacheKey = new StringJoiner("_").add(className).add(methodName).add(paramHash).toString();
+            logger.debug("generate cache key : {}", cacheKey);
+            return cacheKey;
+        };
+    }
+
+    @Bean
+    public Ticker ticker() {
+        return Ticker.systemTicker();
+    }
+
+    @Bean
+    public CacheManager cacheManager(Ticker ticker) {
+        SimpleCacheManager manager = new SimpleCacheManager();
+        if (cache != null) {
+            List<CaffeineCache> caches = cache.entrySet()
+                    .stream()
+                    .map(entry -> buildCache(entry.getKey(), entry.getValue(), ticker))
+                    .collect(Collectors.toList());
+            manager.setCaches(caches);
+        }
+        return manager;
+    }
+
+    private CaffeineCache buildCache(String key, CacheProperties config, Ticker ticker) {
+        logger.info("Cache key {} specified timeout of {} seconds, max of {}", key, config.getTimeout(), config.getMax());
+        final Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder()
+                .expireAfterWrite(config.getTimeout(), TimeUnit.SECONDS)
+                .maximumSize(config.getMax())
+                .ticker(ticker);
+        return new CaffeineCache(key, caffeineBuilder.build());
+    }
+
+    static class CacheProperties {
+        private Integer timeout;
+        private Integer max = 200;
+
+        public Integer getTimeout() {
+            return timeout;
+        }
+
+        public void setTimeout(Integer timeout) {
+            this.timeout = timeout;
+        }
+
+        public Integer getMax() {
+            return max;
+        }
+
+        public void setMax(Integer max) {
+            this.max = max;
+        }
+    }
+
+    public void setCache(Map<String, CacheProperties> cache) {
+        this.cache = cache;
+    }
+
 }

+ 37 - 9
dbsyncer-web/src/main/java/org/dbsyncer/web/config/ThreadPoolConfig.java → dbsyncer-common/src/main/java/org/dbsyncer/common/config/ThreadPoolConfig.java

@@ -1,6 +1,6 @@
-package org.dbsyncer.web.config;
+package org.dbsyncer.common.config;
 
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -14,19 +14,23 @@ import java.util.concurrent.RejectedExecutionHandler;
  * @date 2020-04-26 23:40
  */
 @Configuration
+@ConfigurationProperties(prefix = "dbsyncer.web.thread.pool")
 public class ThreadPoolConfig {
 
     /**
-     * 工作线程池队列容量
+     * 工作线程
      */
-    @Value(value = "${dbsyncer.web.thread.pool.queue.capacity}")
-    private int queueCapacity;
+    private int coreSize = Runtime.getRuntime().availableProcessors() * 2;
 
     /**
-     * 工作线程数
+     * 最大工作线程数
+     */
+    private int maxSize = 128;
+
+    /**
+     * 工作线任务队列
      */
-    @Value(value = "${dbsyncer.web.thread.pool.core.size}")
-    private int coreSize;
+    private int queueCapacity = 2000;
 
     @Bean("taskExecutor")
     public Executor taskExecutor() {
@@ -37,7 +41,7 @@ public class ThreadPoolConfig {
         executor.setCorePoolSize(coreSize);
         //最大线程数128:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
         //maxPoolSize 当系统负载大道最大值时,核心线程数已无法按时处理完所有任务,这是就需要增加线程.每秒200个任务需要20个线程,那么当每秒1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60;
-        executor.setMaxPoolSize(128);
+        executor.setMaxPoolSize(maxSize);
         //缓冲队列:用来缓冲执行任务的队列
         executor.setQueueCapacity(queueCapacity);
         //允许线程的空闲时间30秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
@@ -70,4 +74,28 @@ public class ThreadPoolConfig {
         };
     }
 
+    public int getQueueCapacity() {
+        return queueCapacity;
+    }
+
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
+    public int getCoreSize() {
+        return coreSize;
+    }
+
+    public void setCoreSize(int coreSize) {
+        this.coreSize = coreSize;
+    }
+
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+    public void setMaxSize(int maxSize) {
+        this.maxSize = maxSize;
+    }
+
 }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java

@@ -14,7 +14,7 @@ import java.util.logging.Logger;
 
 public class SimpleDataSource implements DataSource, AutoCloseable {
 
-    private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(2000);
+    private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(500);
     private long lifeTime = 60 * 1000;
     private String driverClassName;
     private String url;

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -209,6 +209,8 @@ public class FileExtractor extends AbstractExtractor {
                     for (WatchEvent<?> event : watchEvents) {
                         parseEvent(event.context().toString());
                     }
+                } catch (ClosedWatchServiceException e) {
+                    break;
                 } catch (Exception e) {
                     logger.error(e.getMessage());
                 } finally {

+ 14 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -23,6 +23,8 @@ import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * 授予登录账号监听事件权限
@@ -56,6 +58,8 @@ public class DBChangeNotification {
     private Set<String> filterTable;
     private List<RowEventListener> listeners = new ArrayList<>();
     private BlockingQueue<DCNEvent> queue = new LinkedBlockingQueue<>(100);
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
 
     public DBChangeNotification(String username, String password, String url) {
         this.username = username;
@@ -69,7 +73,13 @@ public class DBChangeNotification {
 
     public void start() throws SQLException {
         try {
+            connectLock.lock();
+            if (connected) {
+                logger.error("DBChangeNotification is already started");
+                return;
+            }
             conn = connect();
+            connected = true;
             statement = (OracleStatement) conn.createStatement();
             readTables();
 
@@ -112,6 +122,8 @@ public class DBChangeNotification {
             // to interrupt the thread otherwise it will be hanging around.
             close();
             throw ex;
+        } finally {
+            connectLock.unlock();
         }
     }
 
@@ -124,6 +136,7 @@ public class DBChangeNotification {
     }
 
     public void close() {
+        connected = false;
         if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
@@ -323,7 +336,7 @@ public class DBChangeNotification {
 
         @Override
         public void run() {
-            while (!isInterrupted()) {
+            while (!isInterrupted() && connected) {
                 try {
                     // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
                     DCNEvent event = queue.take();

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -82,7 +82,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
     @PostConstruct
     private void init() {
-        scheduledTaskService.start(10000, this);
+        scheduledTaskService.start(3000, this);
     }
 
     @Override

+ 6 - 7
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.monitor;
 
+import org.dbsyncer.common.config.ThreadPoolConfig;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -17,7 +18,7 @@ import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.query.Query;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
@@ -40,14 +41,12 @@ public class MonitorFactory implements Monitor {
     @Autowired
     private Manager manager;
 
+    @Qualifier("taskExecutor")
     @Autowired
     private Executor taskExecutor;
 
-    /**
-     * 工作线程池队列容量
-     */
-    @Value(value = "${dbsyncer.web.thread.pool.queue.capacity}")
-    private int queueCapacity;
+    @Autowired
+    private ThreadPoolConfig threadPoolConfig;
 
     @Override
     public Mapping getMapping(String mappingId) {
@@ -134,7 +133,7 @@ public class MonitorFactory implements Monitor {
         ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
         BlockingQueue<Runnable> queue = pool.getQueue();
         report.setQueueUp(queue.size());
-        report.setQueueCapacity(queueCapacity);
+        report.setQueueCapacity(threadPoolConfig.getQueueCapacity());
         return report;
     }
 

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -91,7 +91,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
             size = temp.size();
         } else {
             buffer.offer((Request) request);
-            size = temp.size();
+            size = buffer.size();
         }
 
         // TODO 临时解决方案:生产大于消费问题,限制生产速度

+ 4 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser.flush.impl;
 
 import com.alibaba.fastjson.JSONException;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.flush.model.StorageRequest;
@@ -35,6 +36,8 @@ public class FlushServiceImpl implements FlushService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private static final int MAX_ERROR_LENGTH = 1000;
+
     @Autowired
     private StorageService storageService;
 
@@ -62,7 +65,7 @@ public class FlushServiceImpl implements FlushService {
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
             params.put(ConfigConstant.DATA_EVENT, event);
-            params.put(ConfigConstant.DATA_ERROR, error);
+            params.put(ConfigConstant.DATA_ERROR, StringUtil.substring(error, 0, MAX_ERROR_LENGTH));
             try {
                 params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
             } catch (JSONException e) {

+ 0 - 50
dbsyncer-web/src/main/java/org/dbsyncer/web/config/ScheduleConfig.java

@@ -1,50 +0,0 @@
-package org.dbsyncer.web.config;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.annotation.SchedulingConfigurer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.scheduling.config.ScheduledTaskRegistrar;
-
-import java.util.concurrent.RejectedExecutionHandler;
-
-/**
- * @author yjwang
- * @date 2022/4/29 10:27
- */
-@Configuration
-public class ScheduleConfig implements SchedulingConfigurer {
-
-    @Override
-    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
-        scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
-    }
-
-    @Bean(name = "taskScheduler", destroyMethod = "shutdown")
-    public ThreadPoolTaskScheduler taskScheduler() {
-        int poolSize = Runtime.getRuntime().availableProcessors() * 2;
-        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
-        //核心线程池大小
-        scheduler.setPoolSize(poolSize);
-        //线程名字前缀
-        scheduler.setThreadNamePrefix("taskScheduler-");
-        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
-        scheduler.setWaitForTasksToCompleteOnShutdown(true);
-        //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
-        scheduler.setAwaitTerminationSeconds(60);
-        // 线程池满,拒绝策略
-        scheduler.setRejectedExecutionHandler(rejectedExecutionHandler());
-
-        return scheduler;
-    }
-
-    public RejectedExecutionHandler rejectedExecutionHandler() {
-        return (r, executor) -> {
-            try {
-                executor.getQueue().put(r);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        };
-    }
-}

+ 3 - 5
dbsyncer-web/src/main/resources/application.properties

@@ -1,21 +1,19 @@
 server.ip=127.0.0.1
 server.port=18686
-
 #web
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-dbsyncer.web.thread.pool.core.size=10
-dbsyncer.web.thread.pool.queue.capacity=2000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/
-
+#dbsyncer.web.thread.pool.coreSize=8
+#dbsyncer.web.thread.pool.maxSize=128
+#dbsyncer.web.thread.pool.queueCapacity=2000
 #storage
 dbsyncer.storage.id=1
 #dbsyncer.storage.support.mysql.enabled=true
 #dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.password=123
-
 #parser
 #dbsyncer.parser.flush.full.enabled=true
 

+ 3 - 1
dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html

@@ -31,7 +31,9 @@
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
-            <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="128" dbsyncer-valid="require" rows="5" th:text="${connector?.config?.url}?:'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
+            <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024"
+                      dbsyncer-valid="require" rows="5"
+                      th:text="${connector?.config?.url}?:'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
         </div>
     </div>
     <div class="form-group">