Parcourir la source

!158 merge
Merge pull request !158 from AE86/design_consumer

AE86 il y a 1 an
Parent
commit
c3615d4376
29 fichiers modifiés avec 182 ajouts et 152 suppressions
  1. 1 1
      dbsyncer-biz/pom.xml
  2. 4 7
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java
  3. 1 1
      dbsyncer-cache/pom.xml
  4. 1 1
      dbsyncer-cluster/pom.xml
  5. 1 1
      dbsyncer-common/pom.xml
  6. 1 1
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/BeanUtil.java
  7. 6 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java
  8. 1 1
      dbsyncer-connector/pom.xml
  9. 1 1
      dbsyncer-listener/pom.xml
  10. 1 1
      dbsyncer-manager/pom.xml
  11. 6 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/BufferActuatorRouter.java
  12. 1 1
      dbsyncer-monitor/pom.xml
  13. 29 17
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  14. 46 0
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/enums/BufferActuatorMetricEnum.java
  15. 7 23
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/enums/ThreadPoolMetricEnum.java
  16. 1 1
      dbsyncer-parser/pom.xml
  17. 8 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  18. 5 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  19. 27 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  20. 1 1
      dbsyncer-plugin/pom.xml
  21. 1 1
      dbsyncer-storage/pom.xml
  22. 1 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/StorageEnum.java
  23. 0 21
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/BinlogStrategy.java
  24. 13 6
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java
  25. 9 35
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  26. 0 20
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java
  27. 1 1
      dbsyncer-web/pom.xml
  28. 7 7
      dbsyncer-web/src/main/resources/application.properties
  29. 1 1
      pom.xml

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 4 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -25,9 +25,9 @@ import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.monitor.Monitor;
+import org.dbsyncer.monitor.enums.BufferActuatorMetricEnum;
 import org.dbsyncer.monitor.enums.DiskMetricEnum;
 import org.dbsyncer.monitor.enums.MetricEnum;
-import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
 import org.dbsyncer.monitor.model.AppReportMetric;
 import org.dbsyncer.monitor.model.MetricResponse;
 import org.dbsyncer.parser.enums.ModelEnum;
@@ -89,12 +89,9 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
 
     @PostConstruct
     private void init() {
-        metricDetailFormatterMap.putIfAbsent(ThreadPoolMetricEnum.CORE_SIZE.getCode(), new ValueMetricDetailFormatter());
-        metricDetailFormatterMap.putIfAbsent(ThreadPoolMetricEnum.TASK_SUBMITTED.getCode(), new ValueMetricDetailFormatter());
-        metricDetailFormatterMap.putIfAbsent(ThreadPoolMetricEnum.QUEUE_UP.getCode(), new ValueMetricDetailFormatter());
-        metricDetailFormatterMap.putIfAbsent(ThreadPoolMetricEnum.ACTIVE.getCode(), new ValueMetricDetailFormatter());
-        metricDetailFormatterMap.putIfAbsent(ThreadPoolMetricEnum.COMPLETED.getCode(), new ValueMetricDetailFormatter());
-        metricDetailFormatterMap.putIfAbsent(ThreadPoolMetricEnum.REMAINING_CAPACITY.getCode(), new ValueMetricDetailFormatter());
+        metricDetailFormatterMap.putIfAbsent(BufferActuatorMetricEnum.GENERAL.getCode(), new ValueMetricDetailFormatter());
+        metricDetailFormatterMap.putIfAbsent(BufferActuatorMetricEnum.STORAGE.getCode(), new ValueMetricDetailFormatter());
+        metricDetailFormatterMap.putIfAbsent(BufferActuatorMetricEnum.TABLE_GROUP.getCode(), new ValueMetricDetailFormatter());
         metricDetailFormatterMap.putIfAbsent(MetricEnum.THREADS_LIVE.getCode(), new DoubleRoundMetricDetailFormatter());
         metricDetailFormatterMap.putIfAbsent(MetricEnum.THREADS_PEAK.getCode(), new DoubleRoundMetricDetailFormatter());
         metricDetailFormatterMap.putIfAbsent(MetricEnum.MEMORY_USED.getCode(), new MemoryMetricDetailFormatter());

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 1 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/util/BeanUtil.java

@@ -80,4 +80,4 @@ public abstract class BeanUtil {
         return null;
     }
 
-}
+}

+ 6 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -8,6 +8,12 @@ public abstract class StringUtil {
 
     public static final String SYMBOL = "-";
 
+    public static final String COLON = ":";
+
+    public static final String COMMA = ",";
+
+    public static final String SPACE = " ";
+
     public static boolean equals(CharSequence cs1, CharSequence cs2) {
         return StringUtils.equals(cs1, cs2);
     }

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 6 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/BufferActuatorRouter.java

@@ -11,6 +11,7 @@ import org.springframework.beans.factory.DisposableBean;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -87,4 +88,9 @@ public final class BufferActuatorRouter implements DisposableBean {
         router.values().forEach(map -> map.values().forEach(actuator -> total.addAndGet(actuator.getQueueCapacity())));
         return total;
     }
+
+    public Map<String, Map<String, TableGroupBufferActuator>> getRouter() {
+        return Collections.unmodifiableMap(router);
+    }
+
 }

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 29 - 17
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -9,6 +9,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.puller.BufferActuatorRouter;
+import org.dbsyncer.monitor.enums.BufferActuatorMetricEnum;
 import org.dbsyncer.monitor.enums.MetricEnum;
 import org.dbsyncer.monitor.enums.StatisticEnum;
 import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
@@ -39,7 +40,6 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -56,9 +56,6 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
     @Resource
     private Manager manager;
 
-    @Resource
-    private Executor generalExecutor;
-
     @Resource
     private BufferActuator generalBufferActuator;
 
@@ -77,6 +74,8 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
 
     private final MappingReportMetric mappingReportMetric = new MappingReportMetric();
 
+    private final int SHOW_BUFFER_ACTUATOR_SIZE = 6;
+
     @PostConstruct
     private void init() {
         scheduledTaskService.start(5000, this);
@@ -175,19 +174,36 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
 
     @Override
     public List<MetricResponse> getMetricInfo() {
-        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) generalExecutor;
-        ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
-
         List<MetricResponse> list = new ArrayList<>();
-        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.CORE_SIZE, pool.getCorePoolSize()));
-        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.TASK_SUBMITTED, pool.getTaskCount()));
-        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.QUEUE_UP, pool.getQueue().size()));
-        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.ACTIVE, pool.getActiveCount()));
-        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.COMPLETED, pool.getCompletedTaskCount()));
-        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.REMAINING_CAPACITY, pool.getQueue().remainingCapacity()));
+        collect(list, generalBufferActuator, BufferActuatorMetricEnum.GENERAL);
+        collect(list, storageBufferActuator, BufferActuatorMetricEnum.STORAGE);
+        if (!CollectionUtils.isEmpty(bufferActuatorRouter.getRouter())) {
+            List<MetricResponse> tableResult = new ArrayList<>();
+            bufferActuatorRouter.getRouter().forEach((metaId, group) -> {
+                group.forEach((k, bufferActuator) ->
+                        collect(tableResult, bufferActuator, BufferActuatorMetricEnum.TABLE_GROUP)
+                );
+            });
+            // TODO sort by active
+            list.addAll(tableResult.size() <= SHOW_BUFFER_ACTUATOR_SIZE ? tableResult : tableResult.subList(0, SHOW_BUFFER_ACTUATOR_SIZE));
+        }
         return list;
     }
 
+    private void collect(List<MetricResponse> result, BufferActuator bufferActuator, BufferActuatorMetricEnum metricEnum) {
+        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) bufferActuator.getExecutor();
+        ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
+        StringBuilder msg = new StringBuilder();
+        msg.append(ThreadPoolMetricEnum.CORE_SIZE.getMetricName()).append(StringUtil.COLON).append(pool.getCorePoolSize()).append(StringUtil.SPACE);
+        msg.append(ThreadPoolMetricEnum.QUEUE_UP.getMetricName()).append(StringUtil.COLON).append(pool.getQueue().size()).append(StringUtil.SPACE);
+        msg.append(ThreadPoolMetricEnum.ACTIVE.getMetricName()).append(StringUtil.COLON).append(pool.getActiveCount()).append(StringUtil.SPACE);
+        msg.append(ThreadPoolMetricEnum.REMAINING_CAPACITY.getMetricName()).append(StringUtil.COLON).append(pool.getQueue().remainingCapacity()).append(StringUtil.SPACE);
+        msg.append("堆积").append(StringUtil.COLON).append(bufferActuator.getQueue().size()).append(StringUtil.SPACE);
+        msg.append("容量").append(StringUtil.COLON).append(bufferActuator.getQueueCapacity()).append(StringUtil.SPACE);
+        msg.append(ThreadPoolMetricEnum.COMPLETED.getMetricName()).append(StringUtil.COLON).append(pool.getCompletedTaskCount());
+        result.add(new MetricResponse(metricEnum.getCode(), metricEnum.getGroup(), metricEnum.getMetricName(), Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), msg.toString()))));
+    }
+
     @Override
     public AppReportMetric getAppReportMetric() {
         queryTime = LocalDateTime.now();
@@ -282,10 +298,6 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_DELETE));
     }
 
-    private MetricResponse createThreadPoolMetricResponse(ThreadPoolMetricEnum metricEnum, Object value) {
-        return new MetricResponse(metricEnum.getCode(), metricEnum.getGroup(), metricEnum.getMetricName(), Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), value)));
-    }
-
     private long queryMappingMetricCount(List<Meta> metaAll, QueryMappingOperation operation) {
         AtomicLong total = new AtomicLong(0);
         if (!CollectionUtils.isEmpty(metaAll)) {

+ 46 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/enums/BufferActuatorMetricEnum.java

@@ -0,0 +1,46 @@
+package org.dbsyncer.monitor.enums;
+
+/**
+ * 缓存执行器指标
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/07/23 0:19
+ */
+public enum BufferActuatorMetricEnum {
+
+    /**
+     * 持久化执行器
+     */
+    STORAGE("buffer.actuator.storage", "持久化执行器", ""),
+    /**
+     * 通用执行器
+     */
+    GENERAL("buffer.actuator.general", "通用执行器", ""),
+    /**
+     * 表执行器
+     */
+    TABLE_GROUP("buffer.actuator.table.group", "表执行器", "");
+
+    private String code;
+    private String group;
+    private String metricName;
+
+    BufferActuatorMetricEnum(String code, String group, String metricName) {
+        this.code = code;
+        this.group = group;
+        this.metricName = metricName;
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+}

+ 7 - 23
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/enums/ThreadPoolMetricEnum.java

@@ -10,48 +10,32 @@ package org.dbsyncer.monitor.enums;
 public enum ThreadPoolMetricEnum {
 
     /**
-     * 已提交
+     * 核心线程
      */
-    CORE_SIZE("thread.pool.core.size", "线程池", "核心数"),
-    /**
-     * 已提交
-     */
-    TASK_SUBMITTED("thread.pool.task.submitted", "线程池", "已提交"),
+    CORE_SIZE("线程"),
     /**
      * 排队中
      */
-    QUEUE_UP("thread.pool.queue.up", "线程池", "排队中"),
+    QUEUE_UP("排队中"),
     /**
      * 处理中
      */
-    ACTIVE("thread.pool.active", "线程池", "处理中"),
+    ACTIVE("处理中"),
     /**
      * 已完成
      */
-    COMPLETED("thread.pool.completed", "线程池", "已完成"),
+    COMPLETED("完成"),
     /**
      * 空闲队列
      */
-    REMAINING_CAPACITY("thread.pool.remaining.capacity", "线程池", "空闲队列");
+    REMAINING_CAPACITY("空闲队列");
 
-    private String code;
-    private String group;
     private String metricName;
 
-    ThreadPoolMetricEnum(String code, String group, String metricName) {
-        this.code = code;
-        this.group = group;
+    ThreadPoolMetricEnum(String metricName) {
         this.metricName = metricName;
     }
 
-    public String getCode() {
-        return code;
-    }
-
-    public String getGroup() {
-        return group;
-    }
-
     public String getMetricName() {
         return metricName;
     }

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 8 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush;
 
 import java.util.Queue;
+import java.util.concurrent.Executor;
 
 /**
  * @author AE86
@@ -30,4 +31,11 @@ public interface BufferActuator {
      */
     int getQueueCapacity();
 
+    /**
+     * 获取线程池
+     *
+     * @return
+     */
+    Executor getExecutor();
+
 }

+ 5 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -134,6 +134,11 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         pluginFactory.postProcessAfter(group.getPlugin(), context);
     }
 
+    @Override
+    public Executor getExecutor() {
+        return generalExecutor;
+    }
+
     /**
      * 获取连接器配置
      *

+ 27 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -6,13 +6,17 @@ import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 
 /**
- * 持久化任务缓冲执行器
+ * 持久化执行器
  *
  * @author AE86
  * @version 1.0.0
@@ -21,12 +25,17 @@ import javax.annotation.Resource;
 @Component
 public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Resource
     private StorageConfig storageConfig;
 
     @Resource
     private StorageService storageService;
 
+    @Resource
+    private Executor storageExecutor;
+
     @PostConstruct
     public void init() {
         setConfig(storageConfig);
@@ -46,7 +55,23 @@ public final class StorageBufferActuator extends AbstractBufferActuator<StorageR
 
     @Override
     protected void pull(StorageResponse response) {
-        storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
+        final CountDownLatch latch = new CountDownLatch(1);
+        storageExecutor.execute(() -> {
+            try {
+                storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
+            } finally {
+                latch.countDown();
+            }
+        });
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
     }
 
+    @Override
+    public Executor getExecutor() {
+        return storageExecutor;
+    }
 }

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 1 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/StorageEnum.java

@@ -22,6 +22,7 @@ public enum StorageEnum {
     /**
      * Binlog:缓存队列数据
      */
+    @Deprecated
     BINLOG("binlog");
 
     private String type;

+ 0 - 21
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/BinlogStrategy.java

@@ -1,21 +0,0 @@
-package org.dbsyncer.storage.strategy.impl;
-
-import org.dbsyncer.storage.enums.StorageEnum;
-import org.dbsyncer.storage.strategy.Strategy;
-import org.springframework.stereotype.Component;
-
-/**
- * 缓存队列数据
- *
- * @author AE86
- * @version 1.0.0
- * @date 2019/11/15 22:39
- */
-@Component
-public class BinlogStrategy implements Strategy {
-
-    @Override
-    public String createSharding(String separator, String collectionId) {
-        return StorageEnum.BINLOG.getType();
-    }
-}

+ 13 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -1,8 +1,13 @@
 package org.dbsyncer.storage.support;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.search.*;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.highlight.Highlighter;
 import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
@@ -24,7 +29,11 @@ import org.dbsyncer.storage.util.DocumentUtil;
 import javax.annotation.PostConstruct;
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -46,10 +55,11 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
 
     @PostConstruct
     private void init() {
+        // 废弃binlog
+        FileUtils.deleteQuietly(new File(PATH + "binlog"));
         // 创建配置和日志索引shard
         getShard(getSharding(StorageEnum.CONFIG, null));
         getShard(getSharding(StorageEnum.LOG, null));
-        getShard(getSharding(StorageEnum.BINLOG, null));
     }
 
     @Override
@@ -211,9 +221,6 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
                 case CONFIG:
                     docs.add(DocumentUtil.convertConfig2Doc(r));
                     break;
-                case BINLOG:
-                    docs.add(DocumentUtil.convertBinlog2Doc(r));
-                    break;
                 default:
                     break;
             }

+ 9 - 35
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -65,7 +65,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private static final String SHOW_DATA_TABLE = "show tables where Tables_in_%s like \"%s\"";
     private static final String DROP_TABLE = "DROP TABLE %s";
     private static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
-    private static final String UPGRADE_SQL = "upgrade";
 
     @Autowired
     private ConnectorFactory connectorFactory;
@@ -333,38 +332,18 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     private void initUpgradeSql() {
-        // show tables where Tables_in_dbsyncer like "dbsyncer_data%"
-        String sql = String.format(SHOW_DATA_TABLE, database, PREFIX_TABLE.concat(StorageEnum.DATA.getType()).concat("%"));
-        List<String> tables = null;
         try {
-            tables = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
-        } catch (EmptyResultDataAccessException e) {
-            // 没有可更新的表
-        }
-        if (CollectionUtils.isEmpty(tables)) {
-            return;
-        }
-        final String queryColumnCount = "SELECT count(*) FROM information_schema.columns WHERE table_name = '%s' and column_name = 'DATA'";
-        tables.forEach(table -> {
-            try {
-                String query = String.format(queryColumnCount, table);
-                // 是否已升级
-                int count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(query, Integer.class));
-                if (count == 0) {
-                    String ddlSql = readSql(UPGRADE_SQL, false, table);
-                    Stream.of(StringUtil.split(ddlSql, ";")).forEach(ddl -> executeSql(ddl));
+            executeSql("drop table if exists `dbsyncer_binlog`;");
+        } catch (Exception e) {
+            if (e.getCause() instanceof SQLSyntaxErrorException) {
+                SQLSyntaxErrorException ex = (SQLSyntaxErrorException) e.getCause();
+                if (ex.getSQLState().equals("42S21")) {
+                    // ignore
+                    return;
                 }
-            } catch (Exception e) {
-                if (e.getCause() instanceof SQLSyntaxErrorException) {
-                    SQLSyntaxErrorException ex = (SQLSyntaxErrorException) e.getCause();
-                    if (ex.getSQLState().equals("42S21")) {
-                        // ignore
-                        return;
-                    }
-                }
-                logger.error(e.getMessage());
             }
-        });
+            logger.error(e.getMessage());
+        }
     }
 
     private void initTable() throws InterruptedException {
@@ -377,17 +356,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> logFields = builder.getFields();
 
-        // 缓存任务
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.BINLOG_STATUS, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.BINLOG_DATA);
-        List<Field> binlogFields = builder.getFields();
-
         // 数据
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.BINLOG_DATA);
         List<Field> dataFields = builder.getFields();
 
         tables.computeIfAbsent(StorageEnum.CONFIG.getType(), k -> new Executor(k, configFields, true, true));
         tables.computeIfAbsent(StorageEnum.LOG.getType(), k -> new Executor(k, logFields, true, false));
-        tables.computeIfAbsent(StorageEnum.BINLOG.getType(), k -> new Executor(k, binlogFields, true, false));
         tables.computeIfAbsent(StorageEnum.DATA.getType(), k -> new Executor(k, dataFields, false, false));
         // 创建表
         tables.forEach((tableName, e) -> {

+ 0 - 20
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java

@@ -119,24 +119,4 @@ public abstract class DocumentUtil {
         return doc;
     }
 
-    public static Document convertBinlog2Doc(Map params) {
-        Document doc = new Document();
-        String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
-        doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
-
-        Integer status = (Integer) params.get(ConfigConstant.BINLOG_STATUS);
-        doc.add(new IntPoint(ConfigConstant.BINLOG_STATUS, status));
-        doc.add(new StoredField(ConfigConstant.BINLOG_STATUS, status));
-
-        byte[] bytes = (byte[]) params.get(ConfigConstant.BINLOG_DATA);
-        doc.add(new BinaryDocValuesField(ConfigConstant.BINLOG_DATA, new BytesRef(bytes)));
-        doc.add(new StoredField(ConfigConstant.BINLOG_DATA, bytes));
-
-        Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
-        doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
-        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
-        doc.add(new NumericDocValuesField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
-        return doc;
-    }
-
 }

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.4-RC_0825</version>
+        <version>1.2.4-RC_0901</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 7 - 7
dbsyncer-web/src/main/resources/application.properties

@@ -16,24 +16,24 @@ dbsyncer.web.scheduler.pool-size=8
 #parser
 # *********************** 通用执行器配置 ***********************
 # [GeneralBufferActuator]线程数
-dbsyncer.parser.general.thread-core-size=10
+dbsyncer.parser.general.thread-core-size=8
 # [GeneralBufferActuator]线程池队列
-dbsyncer.parser.general.thread-queue-capacity=1000
+dbsyncer.parser.general.thread-queue-capacity=64
 # [GeneralBufferActuator]单次执行任务数
 dbsyncer.parser.general.buffer-writer-count=100
 # [GeneralBufferActuator]每次消费缓存队列的任务数
 dbsyncer.parser.general.buffer-pull-count=1000
 # [GeneralBufferActuator]缓存队列容量
-dbsyncer.parser.general.buffer-queue-capacity=50000
+dbsyncer.parser.general.buffer-queue-capacity=80000
 # [GeneralBufferActuator]定时消费缓存队列间隔(毫秒)
 dbsyncer.parser.general.buffer-period-millisecond=300
 # *********************** 表执行器配置 ***********************
 # 每个驱动最多可分配的表执行器个数
 dbsyncer.parser.table.group.max-buffer-actuator-size=10
 # [TableGroupBufferActuator]线程数
-dbsyncer.parser.table.group.thread-core-size=5
+dbsyncer.parser.table.group.thread-core-size=2
 # [TableGroupBufferActuator]线程池队列
-dbsyncer.parser.table.group.thread-queue-capacity=1000
+dbsyncer.parser.table.group.thread-queue-capacity=16
 # [TableGroupBufferActuator]单次执行任务数
 dbsyncer.parser.table.group.buffer-writer-count=100
 # [TableGroupBufferActuator]每次消费缓存队列的任务数
@@ -51,9 +51,9 @@ dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?r
 dbsyncer.storage.support.mysql.config.username=root
 dbsyncer.storage.support.mysql.config.password=123
 # [StorageBufferActuator]线程数
-dbsyncer.storage.thread-core-size=5
+dbsyncer.storage.thread-core-size=4
 # [StorageBufferActuator]线程池队列
-dbsyncer.storage.thread-queue-capacity=500
+dbsyncer.storage.thread-queue-capacity=32
 # [StorageBufferActuator]单次执行任务数
 dbsyncer.storage.buffer-writer-count=100
 # [StorageBufferActuator]每次消费缓存队列的任务数

+ 1 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-    <version>1.2.4-RC_0825</version>
+    <version>1.2.4-RC_0901</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>