Browse Source

优化性能

AE86 2 years ago
parent
commit
b5f5ff1f7d

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -21,7 +21,7 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
 
     @Override
     public int getQueueCapacity() {
-        return 1_0000;
+        return super.getQueueCapacity() / 4;
     }
 
     @Override

+ 9 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -68,13 +68,18 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
     }
 
     @Override
-    protected String getTaskName() {
-        return "WriterBinlog";
+    public Queue getQueue() {
+        return writerBufferActuator.getQueue();
     }
 
     @Override
-    protected Queue getQueue() {
-        return writerBufferActuator.getQueue();
+    public int getQueueCapacity() {
+        return writerBufferActuator.getQueueCapacity();
+    }
+
+    @Override
+    protected String getTaskName() {
+        return "WriterBinlog";
     }
 
     @Override

+ 78 - 72
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -35,7 +35,6 @@ import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -44,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * @version 1.0.0
  * @date 2022/6/8 0:53
  */
-public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder, ScheduledTaskJob, DisposableBean {
+public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder, DisposableBean {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -58,35 +57,29 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private static final String BINLOG_CONTENT = "c";
 
-    private static final int MAX_COUNT_SIZE = 20000;
-
     private static final int SUBMIT_COUNT = 1000;
 
-    private static final int PERIOD = 3000;
-
-    private Queue<BinlogMessage> queue = new LinkedBlockingQueue(MAX_COUNT_SIZE);
+    private static final Queue<BinlogMessage> queue = new LinkedBlockingQueue(10000);
 
     private static final ByteBuffer buffer = ByteBuffer.allocate(8);
 
     private static final BinlogColumnValue value = new BinlogColumnValue();
 
-    private final Lock lock = new ReentrantLock(true);
-
-    private volatile boolean running;
-
-    /**
-     * 相对路径/data/binlog/
-     */
     private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
             .append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
 
     private Shard shard;
 
+    private WriterTask writerTask = new WriterTask();
+
+    private ReaderTask readerTask = new ReaderTask();
+
     @PostConstruct
     private void init() throws IOException {
-        // /data/binlog/WriterBinlog/
+        // /data/data/WriterBinlog/
         shard = new Shard(PATH + getTaskName());
-        scheduledTaskService.start(PERIOD, this);
+        scheduledTaskService.start(500, writerTask);
+        scheduledTaskService.start(2000, readerTask);
     }
 
     /**
@@ -98,13 +91,6 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         return getClass().getSimpleName();
     }
 
-    /**
-     * 获取缓存队列
-     *
-     * @return
-     */
-    protected abstract Queue getQueue();
-
     /**
      * 反序列化任务
      *
@@ -113,31 +99,6 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
      */
     protected abstract Message deserialize(BinlogMessage message);
 
-    @Override
-    public void run() {
-        if (running || getQueue().size() > (MAX_COUNT_SIZE - SUBMIT_COUNT)) {
-            return;
-        }
-
-        final Lock binlogLock = lock;
-        boolean locked = false;
-        try {
-            locked = binlogLock.tryLock();
-            if (locked) {
-                running = true;
-                flushMessage();
-                doParse();
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        } finally {
-            if (locked) {
-                running = false;
-                binlogLock.unlock();
-            }
-        }
-    }
-
     @Override
     public void flush(BinlogMessage message) {
         queue.offer(message);
@@ -174,30 +135,6 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         shard.deleteBatch(terms);
     }
 
-    private void flushMessage() throws IOException {
-        if(queue.isEmpty()){
-            return;
-        }
-
-        List<Document> tasks = new ArrayList<>();
-        AtomicLong batchCounter = new AtomicLong();
-        Document doc;
-        while (!queue.isEmpty() && batchCounter.get() < SUBMIT_COUNT) {
-            BinlogMessage message = queue.poll();
-            if(null != message){
-                doc = new Document();
-                doc.add(new StringField(BINLOG_ID, String.valueOf(snowflakeIdWorker.nextId()), Field.Store.YES));
-                doc.add(new StoredField(BINLOG_CONTENT, new BytesRef(message.toByteArray())));
-                tasks.add(doc);
-            }
-            batchCounter.incrementAndGet();
-        }
-
-        if(!CollectionUtils.isEmpty(tasks)){
-            shard.insertBatch(tasks);
-        }
-    }
-
     /**
      * Java语言提供了八种基本类型,六种数字类型(四个整数型,两个浮点型),一种字符类型,一种布尔型。
      * <p>
@@ -374,4 +311,73 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         }
     }
 
+    /**
+     * 合并缓存队列任务到磁盘
+     */
+    final class WriterTask implements ScheduledTaskJob {
+
+        @Override
+        public void run() {
+            if (queue.isEmpty()) {
+                return;
+            }
+
+            List<Document> tasks = new ArrayList<>();
+            int count = 0;
+            Document doc;
+            while (!queue.isEmpty() && count < SUBMIT_COUNT) {
+                BinlogMessage message = queue.poll();
+                if (null != message) {
+                    doc = new Document();
+                    doc.add(new StringField(BINLOG_ID, String.valueOf(snowflakeIdWorker.nextId()), Field.Store.YES));
+                    doc.add(new StoredField(BINLOG_CONTENT, new BytesRef(message.toByteArray())));
+                    tasks.add(doc);
+                }
+                count++;
+            }
+
+            if (!CollectionUtils.isEmpty(tasks)) {
+                try {
+                    shard.insertBatch(tasks);
+                } catch (IOException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+        }
+    }
+
+    /**
+     * 从磁盘读取日志到任务队列
+     */
+    final class ReaderTask implements ScheduledTaskJob {
+
+        private final Lock lock = new ReentrantLock(true);
+
+        private volatile boolean running;
+
+        @Override
+        public void run() {
+            if (running || (SUBMIT_COUNT * 2) + getQueue().size() >= getQueueCapacity()) {
+                return;
+            }
+
+            final Lock binlogLock = lock;
+            boolean locked = false;
+            try {
+                locked = binlogLock.tryLock();
+                if (locked) {
+                    running = true;
+                    doParse();
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            } finally {
+                if (locked) {
+                    running = false;
+                    binlogLock.unlock();
+                }
+            }
+        }
+    }
+
 }

+ 15 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogRecorder.java

@@ -3,6 +3,7 @@ package org.dbsyncer.storage.binlog;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 
 import java.io.IOException;
+import java.util.Queue;
 
 /**
  * @author AE86
@@ -18,4 +19,18 @@ public interface BinlogRecorder {
      */
     void flush(BinlogMessage message) throws IOException;
 
+    /**
+     * 获取缓存队列
+     *
+     * @return
+     */
+    Queue getQueue();
+
+    /**
+     * 获取缓存队列容量
+     *
+     * @return
+     */
+    int getQueueCapacity();
+
 }

+ 5 - 10
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -207,23 +207,18 @@ public class BinlogMessageTest extends AbstractBinlogRecorder {
     }
 
     @Override
-    protected Queue getQueue() {
+    public Queue getQueue() {
         return null;
     }
 
     @Override
-    protected Object deserialize(BinlogMessage message) {
-        return null;
+    public int getQueueCapacity() {
+        return 0;
     }
 
     @Override
-    protected Object resolveValue(int type, ByteString v) {
-        return super.resolveValue(type, v);
-    }
-
-    @Override
-    protected ByteString serializeValue(Object v) {
-        return super.serializeValue(v);
+    protected Object deserialize(BinlogMessage message) {
+        return null;
     }
 
 }

+ 5 - 10
dbsyncer-storage/src/main/test/ShardBinlogTest.java

@@ -120,23 +120,18 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
     }
 
     @Override
-    protected Queue getQueue() {
+    public Queue getQueue() {
         return null;
     }
 
     @Override
-    protected Object deserialize(BinlogMessage message) {
-        return null;
+    public int getQueueCapacity() {
+        return 0;
     }
 
     @Override
-    protected Object resolveValue(int type, ByteString v) {
-        return super.resolveValue(type, v);
-    }
-
-    @Override
-    protected ByteString serializeValue(Object v) {
-        return super.serializeValue(v);
+    protected Object deserialize(BinlogMessage message) {
+        return null;
     }
 
     private void check() throws IOException {