Jelajahi Sumber

性能优化

AE86 1 tahun lalu
induk
melakukan
61c710ea15

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MetricReporter.java

@@ -234,7 +234,7 @@ public class MetricReporter implements ScheduledTaskJob {
         msg.append("堆积").append(StringUtil.COLON).append(info.getQueueUp());
         msg.append(StringUtil.FORWARD_SLASH).append(bufferActuator.getQueueCapacity()).append(StringUtil.SPACE);
         msg.append(ThreadPoolMetricEnum.CORE_SIZE.getMetricName()).append(StringUtil.COLON).append(pool.getActiveCount());
-        msg.append(StringUtil.FORWARD_SLASH).append(pool.getCorePoolSize()).append(StringUtil.SPACE);
+        msg.append(StringUtil.FORWARD_SLASH).append(pool.getMaximumPoolSize()).append(StringUtil.SPACE);
         msg.append(ThreadPoolMetricEnum.COMPLETED.getMetricName()).append(StringUtil.COLON).append(pool.getCompletedTaskCount());
         info.setResponse(new MetricResponse(code, group, metricName, Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), msg.toString()))));
         return info;

+ 14 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/config/GeneralBufferConfig.java

@@ -21,6 +21,11 @@ public class GeneralBufferConfig extends BufferActuatorConfig {
      */
     private int threadCoreSize = Runtime.getRuntime().availableProcessors() * 2;
 
+    /**
+     * 最大工作线程数
+     */
+    private int maxThreadSize = 16;
+
     /**
      * 工作线任务队列
      */
@@ -28,7 +33,7 @@ public class GeneralBufferConfig extends BufferActuatorConfig {
 
     @Bean(name = "generalExecutor", destroyMethod = "shutdown")
     public ThreadPoolTaskExecutor generalExecutor() {
-        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, threadCoreSize, threadQueueCapacity, 30, "GeneralExecutor-");
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, maxThreadSize, threadQueueCapacity, 30, "GeneralExecutor-");
     }
 
     public int getThreadCoreSize() {
@@ -39,6 +44,14 @@ public class GeneralBufferConfig extends BufferActuatorConfig {
         this.threadCoreSize = threadCoreSize;
     }
 
+    public int getMaxThreadSize() {
+        return maxThreadSize;
+    }
+
+    public void setMaxThreadSize(int maxThreadSize) {
+        this.maxThreadSize = maxThreadSize;
+    }
+
     public int getThreadQueueCapacity() {
         return threadQueueCapacity;
     }

+ 14 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/config/StorageConfig.java

@@ -22,6 +22,11 @@ public class StorageConfig extends BufferActuatorConfig {
      */
     private int threadCoreSize = Runtime.getRuntime().availableProcessors();
 
+    /**
+     * 最大工作线程数
+     */
+    private int maxThreadSize = 16;
+
     /**
      * 工作线任务队列
      */
@@ -44,7 +49,7 @@ public class StorageConfig extends BufferActuatorConfig {
 
     @Bean(name = "storageExecutor", destroyMethod = "shutdown")
     public ThreadPoolTaskExecutor storageExecutor() {
-        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, threadCoreSize, threadQueueCapacity, 30, "StorageExecutor-");
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, maxThreadSize, threadQueueCapacity, 30, "StorageExecutor-");
     }
 
     public int getThreadCoreSize() {
@@ -55,6 +60,14 @@ public class StorageConfig extends BufferActuatorConfig {
         this.threadCoreSize = threadCoreSize;
     }
 
+    public int getMaxThreadSize() {
+        return maxThreadSize;
+    }
+
+    public void setMaxThreadSize(int maxThreadSize) {
+        this.maxThreadSize = maxThreadSize;
+    }
+
     public int getThreadQueueCapacity() {
         return threadQueueCapacity;
     }

+ 13 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/TableGroupBufferConfig.java

@@ -23,6 +23,11 @@ public class TableGroupBufferConfig extends BufferActuatorConfig {
      */
     private int threadCoreSize = Runtime.getRuntime().availableProcessors() * 2;
 
+    /**
+     * 最大工作线程数
+     */
+    private int maxThreadSize = 10;
+
     /**
      * 工作线任务队列
      */
@@ -44,6 +49,14 @@ public class TableGroupBufferConfig extends BufferActuatorConfig {
         this.threadCoreSize = threadCoreSize;
     }
 
+    public int getMaxThreadSize() {
+        return maxThreadSize;
+    }
+
+    public void setMaxThreadSize(int maxThreadSize) {
+        this.maxThreadSize = maxThreadSize;
+    }
+
     public int getThreadQueueCapacity() {
         return threadQueueCapacity;
     }

+ 15 - 22
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,23 +1,22 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.config.StorageConfig;
+import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * 持久化执行器
@@ -29,9 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
 @Component
 public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
 
-    private final Duration offerInterval = Duration.of(500, ChronoUnit.MILLIS);
-    private final Lock queueLock = new ReentrantLock(true);
-    private final Condition queueCondition = queueLock.newCondition();
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Resource
     private StorageConfig storageConfig;
@@ -42,6 +39,9 @@ public final class StorageBufferActuator extends AbstractBufferActuator<StorageR
     @Resource
     private Executor storageExecutor;
 
+    @Resource
+    private ProfileComponent profileComponent;
+
     @PostConstruct
     private void init() {
         setConfig(storageConfig);
@@ -61,24 +61,17 @@ public final class StorageBufferActuator extends AbstractBufferActuator<StorageR
 
     @Override
     protected void pull(StorageResponse response) {
-        storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
+        storageExecutor.execute(() -> storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList()));
     }
 
     @Override
     protected void offerFailed(BlockingQueue<StorageRequest> queue, StorageRequest request) {
-        final Lock lock = queueLock;
-        try {
-            // 公平锁,有序执行,容量上限,阻塞重试
-            lock.lock();
-            while (isRunning(request) && !queue.offer(request)) {
-                try {
-                    queueCondition.await(offerInterval.toMillis(), TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    break;
-                }
+        Meta meta = profileComponent.getMeta(request.getMetaId());
+        if (meta != null) {
+            Mapping mapping = profileComponent.getMapping(meta.getMappingId());
+            if (mapping != null) {
+                logger.info("{}, data={}", mapping.getName(), request.getRow());
             }
-        } finally {
-            lock.unlock();
         }
     }
 

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -56,9 +56,10 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
         super.buildQueueConfig();
         taskKey = UUIDUtil.getUUID();
         int coreSize = tableGroupBufferConfig.getThreadCoreSize();
+        int maxSize = tableGroupBufferConfig.getMaxThreadSize();
         int queueCapacity = tableGroupBufferConfig.getThreadQueueCapacity();
         String threadNamePrefix = new StringBuilder("TableGroupExecutor-").append(tableGroupId).append(StringUtil.SYMBOL).toString();
-        threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, threadNamePrefix);
+        threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, maxSize, queueCapacity, 30, threadNamePrefix);
         running = true;
         scheduledTaskService.start(taskKey, tableGroupBufferConfig.getBufferPeriodMillisecond(), this);
     }

+ 1 - 13
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/ds/SimpleDataSource.java

@@ -15,7 +15,6 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.time.Instant;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Logger;
@@ -27,11 +26,6 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
      */
     private final int MAX_IDLE = 300;
 
-    /**
-     * 连接上限后最大等待时间(秒)
-     */
-    private final int MAX_WAIT_SECONDS = 3;
-
     /**
      * 从缓存队列获取连接次数
      */
@@ -71,11 +65,7 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
             lock.lock();
             //如果当前连接数大于或等于最大连接数
             if (activeNum.get() >= MAX_IDLE) {
-                //等待3秒
-                TimeUnit.SECONDS.sleep(MAX_WAIT_SECONDS);
-                if (activeNum.get() >= MAX_IDLE) {
-                    throw new SdkException(String.format("数据库连接数超过上限%d,url=%s", MAX_IDLE, url));
-                }
+                throw new SdkException(String.format("数据库连接数超过上限%d,url=%s", MAX_IDLE, url));
             }
             int time = MAX_PULL_TIME;
             while (time-- > 0){
@@ -94,8 +84,6 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
 
             // 兜底方案,保证一定能获取连接
             return createConnection();
-        } catch (InterruptedException e) {
-            throw new SdkException(e);
         } finally {
             lock.unlock();
         }

+ 15 - 9
dbsyncer-web/src/main/resources/application.properties

@@ -17,21 +17,25 @@ dbsyncer.web.scheduler.pool-size=8
 # *********************** 通用执行器配置 ***********************
 # [GeneralBufferActuator]线程数
 dbsyncer.parser.general.thread-core-size=8
+# [GeneralBufferActuator]最大线程数
+dbsyncer.parser.general.max-thread-size=16
 # [GeneralBufferActuator]线程池队列
 dbsyncer.parser.general.thread-queue-capacity=64
 # [GeneralBufferActuator]单次执行任务数
-dbsyncer.parser.general.buffer-writer-count=100
+dbsyncer.parser.general.buffer-writer-count=1000
 # [GeneralBufferActuator]每次消费缓存队列的任务数
-dbsyncer.parser.general.buffer-pull-count=1000
+dbsyncer.parser.general.buffer-pull-count=20000
 # [GeneralBufferActuator]缓存队列容量
-dbsyncer.parser.general.buffer-queue-capacity=60000
+dbsyncer.parser.general.buffer-queue-capacity=100000
 # [GeneralBufferActuator]定时消费缓存队列间隔(毫秒)
 dbsyncer.parser.general.buffer-period-millisecond=300
 # *********************** 表执行器配置 ***********************
 # 每个驱动最多可分配的表执行器个数
 dbsyncer.parser.table.group.max-buffer-actuator-size=20
 # [TableGroupBufferActuator]线程数
-dbsyncer.parser.table.group.thread-core-size=10
+dbsyncer.parser.table.group.thread-core-size=2
+# [TableGroupBufferActuator]最大线程数
+dbsyncer.parser.table.group.max-thread-size=10
 # [TableGroupBufferActuator]线程池队列
 dbsyncer.parser.table.group.thread-queue-capacity=16
 # [TableGroupBufferActuator]单次执行任务数
@@ -52,20 +56,22 @@ dbsyncer.storage.support.mysql.config.username=root
 dbsyncer.storage.support.mysql.config.password=123
 # [StorageBufferActuator]线程数
 dbsyncer.storage.thread-core-size=4
+# [StorageBufferActuator]最大线程数
+dbsyncer.storage.max-thread-size=8
 # [StorageBufferActuator]线程池队列
-dbsyncer.storage.thread-queue-capacity=32
+dbsyncer.storage.thread-queue-capacity=64
 # [StorageBufferActuator]单次执行任务数
-dbsyncer.storage.buffer-writer-count=100
+dbsyncer.storage.buffer-writer-count=1000
 # [StorageBufferActuator]每次消费缓存队列的任务数
-dbsyncer.storage.buffer-pull-count=1000
+dbsyncer.storage.buffer-pull-count=20000
 # [StorageBufferActuator]缓存队列容量
-dbsyncer.storage.buffer-queue-capacity=50000
+dbsyncer.storage.buffer-queue-capacity=100000
 # [StorageBufferActuator]定时消费缓存队列间隔(毫秒)
 dbsyncer.storage.buffer-period-millisecond=300
 # 是否记录全量数据(false-关闭; true-开启)
 dbsyncer.storage.write.full.enabled=false
 # 是否记录同步成功数据(false-关闭; true-开启)
-dbsyncer.storage.write-success=false
+dbsyncer.storage.write-success=true
 # 是否记录同步失败数据(false-关闭; true-开启)
 dbsyncer.storage.write-fail=true
 # 记录同步失败日志最大长度