Explorar o código

启用持久化同步消息功能

AE86 %!s(int64=2) %!d(string=hai) anos
pai
achega
0c506027b4

+ 0 - 13
dbsyncer-common/src/main/java/org/dbsyncer/common/config/BinlogRecorderConfig.java

@@ -12,11 +12,6 @@ import org.springframework.context.annotation.Configuration;
 @ConfigurationProperties(prefix = "dbsyncer.storage.binlog.recorder")
 public class BinlogRecorderConfig {
 
-    /**
-     * 任务名称
-     */
-    private String taskName = "BinlogRecorder";
-
     /**
      * 批量同步数
      */
@@ -42,14 +37,6 @@ public class BinlogRecorderConfig {
      */
     private int readerPeriodMillisecond = 2000;
 
-    public String getTaskName() {
-        return taskName;
-    }
-
-    public void setTaskName(String taskName) {
-        this.taskName = taskName;
-    }
-
     public int getBatchCount() {
         return batchCount;
     }

+ 98 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java

@@ -0,0 +1,98 @@
+package org.dbsyncer.parser;
+
+import com.google.protobuf.ByteString;
+import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
+import org.dbsyncer.storage.binlog.proto.BinlogMap;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.binlog.proto.EventEnum;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<WriterRequest> {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private BufferActuator writerBufferActuator;
+
+    @Autowired
+    private CacheService cacheService;
+
+    protected void flush(String tableGroupId, String event, Map<String, Object> data) {
+        try {
+            BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
+            data.forEach((k, v) -> {
+                if (null != v) {
+                    ByteString bytes = BinlogMessageUtil.serializeValue(v);
+                    if (null != bytes) {
+                        dataBuilder.putRow(k, bytes);
+                    }
+                }
+            });
+
+            BinlogMessage builder = BinlogMessage.newBuilder()
+                    .setTableGroupId(tableGroupId)
+                    .setEvent(EventEnum.valueOf(event))
+                    .setData(dataBuilder.build())
+                    .build();
+            super.flush(builder);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Override
+    protected WriterRequest deserialize(String messageId, BinlogMessage message) {
+        if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
+            return null;
+        }
+
+        // 1、获取配置信息
+        final TableGroup tableGroup = cacheService.get(message.getTableGroupId(), TableGroup.class);
+
+        // 2、反序列数据
+        try {
+            final Picker picker = new Picker(tableGroup.getFieldMapping());
+            final Map<String, Field> fieldMap = picker.getTargetFieldMap();
+            Map<String, Object> data = new HashMap<>();
+            message.getData().getRowMap().forEach((k, v) -> {
+                if (fieldMap.containsKey(k)) {
+                    data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));
+                }
+            });
+            return new WriterRequest(messageId, message.getTableGroupId(), message.getEvent().name(), data);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+        return null;
+    }
+
+    @Override
+    public String getTaskName() {
+        return "WriterBinlog";
+    }
+
+    @Override
+    public Queue getQueue() {
+        return writerBufferActuator.getQueue();
+    }
+
+    @Override
+    public int getQueueCapacity() {
+        return writerBufferActuator.getQueueCapacity();
+    }
+
+}

+ 2 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/ParserStrategy.java

@@ -15,10 +15,9 @@ public interface ParserStrategy {
     void execute(String tableGroupId, String event, Map<String, Object> data);
 
     /**
-     * 完成同步后,执行回调
+     * 完成同步后,执行回调删除消息
      *
      * @param messageIds
      */
-    default void complete(List<String> messageIds) {
-    }
+    void complete(List<String> messageIds);
 }

+ 4 - 84
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -1,101 +1,21 @@
 package org.dbsyncer.parser.strategy.impl;
 
-import com.google.protobuf.ByteString;
-import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.connector.model.Field;
-import org.dbsyncer.parser.flush.BufferActuator;
-import org.dbsyncer.parser.model.Picker;
-import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.parser.AbstractWriterBinlog;
 import org.dbsyncer.parser.strategy.ParserStrategy;
-import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
-import org.dbsyncer.storage.binlog.proto.BinlogMap;
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.binlog.proto.EventEnum;
-import org.dbsyncer.storage.util.BinlogMessageUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
-public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRecorder<WriterRequest> implements ParserStrategy {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Autowired
-    private BufferActuator writerBufferActuator;
-
-    @Autowired
-    private CacheService cacheService;
+public final class DisableWriterBufferActuatorStrategy extends AbstractWriterBinlog implements ParserStrategy {
 
     @Override
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
-        try {
-            BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
-            data.forEach((k, v) -> {
-                if (null != v) {
-                    ByteString bytes = BinlogMessageUtil.serializeValue(v);
-                    if (null != bytes) {
-                        dataBuilder.putRow(k, bytes);
-                    }
-                }
-            });
-
-            BinlogMessage builder = BinlogMessage.newBuilder()
-                    .setTableGroupId(tableGroupId)
-                    .setEvent(EventEnum.valueOf(event))
-                    .setData(dataBuilder.build())
-                    .build();
-            super.flush(builder);
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        }
+        super.flush(tableGroupId, event, data);
     }
 
     @Override
     public void complete(List<String> messageIds) {
-        super.completeMessage(messageIds);
-    }
-
-    @Override
-    public Queue getQueue() {
-        return writerBufferActuator.getQueue();
-    }
-
-    @Override
-    public int getQueueCapacity() {
-        return writerBufferActuator.getQueueCapacity();
-    }
-
-    @Override
-    protected WriterRequest deserialize(String messageId, BinlogMessage message) {
-        if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
-            return null;
-        }
-
-        // 1、获取配置信息
-        final TableGroup tableGroup = cacheService.get(message.getTableGroupId(), TableGroup.class);
-
-        // 2、反序列数据
-        try {
-            final Picker picker = new Picker(tableGroup.getFieldMapping());
-            final Map<String, Field> fieldMap = picker.getTargetFieldMap();
-            Map<String, Object> data = new HashMap<>();
-            message.getData().getRowMap().forEach((k, v) -> {
-                if (fieldMap.containsKey(k)) {
-                    data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));
-                }
-            });
-            return new WriterRequest(messageId, message.getTableGroupId(), message.getEvent().name(), data);
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        }
-        return null;
+        super.complete(messageIds);
     }
 
 }

+ 12 - 18
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java

@@ -1,50 +1,44 @@
 package org.dbsyncer.parser.strategy.impl;
 
+import org.dbsyncer.parser.AbstractWriterBinlog;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.strategy.ParserStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @Component
 @ConditionalOnProperty(value = "dbsyncer.parser.flush.buffer.actuator.speed.enabled", havingValue = "true")
-public final class EnableWriterBufferActuatorStrategy implements ParserStrategy {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
+public final class EnableWriterBufferActuatorStrategy extends AbstractWriterBinlog implements ParserStrategy {
 
     private static final double BUFFER_THRESHOLD = 0.8;
 
     @Autowired
     private BufferActuator writerBufferActuator;
 
-    private double limit;
+    private static double limit;
 
     @PostConstruct
     private void init() {
-        limit = Math.ceil(writerBufferActuator.getQueueCapacity() * BUFFER_THRESHOLD);
+        limit = Math.ceil(getQueueCapacity() * BUFFER_THRESHOLD);
     }
 
     @Override
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
+        if (getQueue().size() >= limit) {
+            super.flush(tableGroupId, event, data);
+        }
         writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
+    }
 
-        // 超过容量限制,限制生产速度
-        final int size = writerBufferActuator.getQueue().size();
-        if (size >= limit) {
-            try {
-                TimeUnit.SECONDS.sleep(30);
-                logger.warn("当前任务队列大小{}已达上限{},请稍等{}秒", size, limit, 30);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage());
-            }
-        }
+    @Override
+    public void complete(List<String> messageIds) {
+        super.complete(messageIds);
     }
 
 }

+ 6 - 12
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -16,7 +16,6 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
@@ -51,8 +50,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(
-            File.separatorChar).append("data").append(File.separatorChar).toString();
+    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
 
     @Autowired
     private ScheduledTaskService scheduledTaskService;
@@ -74,13 +72,13 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     @PostConstruct
     private void init() throws IOException {
         queue = new LinkedBlockingQueue(binlogRecorderConfig.getQueueCapacity());
-        shard = new Shard(PATH + binlogRecorderConfig.getTaskName());
+        shard = new Shard(PATH + getTaskName());
         scheduledTaskService.start(binlogRecorderConfig.getWriterPeriodMillisecond(), writerTask);
         scheduledTaskService.start(binlogRecorderConfig.getReaderPeriodMillisecond(), readerTask);
     }
 
     /**
-     * 反序列化任务
+     * 反序列化消息
      *
      * @param message
      * @return
@@ -97,12 +95,8 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         shard.close();
     }
 
-    /**
-     * 消息同步完成后,删除消息记录
-     *
-     * @param messageIds
-     */
-    protected void completeMessage(List<String> messageIds) {
+    @Override
+    public void complete(List<String> messageIds) {
         if (!CollectionUtils.isEmpty(messageIds)) {
             try {
                 int size = messageIds.size();
@@ -216,7 +210,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
                 String id = (String) row.get(BinlogConstant.BINLOG_ID);
                 Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
                 BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
-                if(BinlogConstant.PROCESSING == status){
+                if (BinlogConstant.PROCESSING == status) {
                     logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
                 }
                 deleteIds[i] = new Term(BinlogConstant.BINLOG_ID, id);

+ 17 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogRecorder.java

@@ -3,6 +3,7 @@ package org.dbsyncer.storage.binlog;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Queue;
 
 /**
@@ -12,6 +13,15 @@ import java.util.Queue;
  */
 public interface BinlogRecorder {
 
+    /**
+     * 获取任务名称
+     *
+     * @return
+     */
+    default String getTaskName() {
+        return getClass().getSimpleName();
+    }
+
     /**
      * 将任务序列化刷入磁盘
      *
@@ -19,6 +29,13 @@ public interface BinlogRecorder {
      */
     void flush(BinlogMessage message) throws IOException;
 
+    /**
+     * 消息同步完成后,删除消息记录
+     *
+     * @param messageIds
+     */
+    void complete(List<String> messageIds);
+
     /**
      * 获取缓存队列
      *

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -11,7 +11,7 @@ server.servlet.context-path=/
 #dbsyncer.web.thread.pool.queue-capacity=1000
 
 #parser
-dbsyncer.parser.flush.buffer.actuator.speed.enabled=true
+#dbsyncer.parser.flush.buffer.actuator.speed.enabled=true
 #dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
 #dbsyncer.parser.flush.buffer.actuator.batch-count=1000
 #dbsyncer.parser.flush.buffer.actuator.queue-capacity=50000