AE86 il y a 2 ans
Parent
commit
5cea3e7bd0

+ 12 - 14
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -45,10 +45,21 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
         try {
             EventEnum eventEnum = EventEnum.valueOf(event.getEvent());
             Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, eventEnum.name()) ? event.getBefore() : event.getAfter();
+
+            BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
+            data.forEach((k, v) -> {
+                if (null != v) {
+                    ByteString bytes = serializeValue(v);
+                    if (null != bytes) {
+                        dataBuilder.putRow(k, bytes);
+                    }
+                }
+            });
+
             BinlogMessage builder = BinlogMessage.newBuilder()
                     .setTableGroupId(tableGroup.getId())
                     .setEvent(eventEnum)
-                    .setData(serialize(data))
+                    .setData(dataBuilder.build())
                     .build();
             flush(builder);
         } catch (Exception e) {
@@ -102,17 +113,4 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
         return new WriterRequest(tableGroupId, target, mapping.getMetaId(), mapping.getTargetConnectorId(), sourceTableName, targetTableName, event, picker.getTargetFields(), tableGroup.getCommand());
     }
 
-    private BinlogMap serialize(Map<String, Object> data) {
-        BinlogMap.Builder builder = BinlogMap.newBuilder();
-        data.forEach((k, v) -> {
-            if (null != v) {
-                ByteString bytes = serializeValue(v);
-                if (null != bytes) {
-                    builder.putRow(k, bytes);
-                }
-            }
-        });
-        return builder.build();
-    }
-
 }

+ 103 - 28
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -1,16 +1,31 @@
 package org.dbsyncer.storage.binlog;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.util.BytesRef;
+import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
+import org.dbsyncer.storage.lucene.Shard;
+import org.dbsyncer.storage.query.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import javax.annotation.PostConstruct;
+import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -18,9 +33,9 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
-import java.util.BitSet;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -36,11 +51,20 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     @Autowired
     private ScheduledTaskService scheduledTaskService;
 
-    private static final long MAX_BATCH_COUNT = 100L;
+    @Autowired
+    private SnowflakeIdWorker snowflakeIdWorker;
+
+    private static final String BINLOG_ID = "id";
+
+    private static final String BINLOG_CONTENT = "c";
+
+    private static final int MAX_COUNT_SIZE = 20000;
+
+    private static final int SUBMIT_COUNT = 1000;
 
-    private static final long PERIOD = 3000;
+    private static final int PERIOD = 3000;
 
-    private static final long CONTEXT_PERIOD = 10_000;
+    private Queue<BinlogMessage> queue = new LinkedBlockingQueue(MAX_COUNT_SIZE);
 
     private static final ByteBuffer buffer = ByteBuffer.allocate(8);
 
@@ -50,14 +74,19 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private volatile boolean running;
 
-    private BinlogContext context;
+    /**
+     * 相对路径/data/binlog/
+     */
+    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
+            .append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
+
+    private Shard shard;
 
     @PostConstruct
     private void init() throws IOException {
         // /data/binlog/WriterBinlog/
-        context = new BinlogContext(getTaskName());
+        shard = new Shard(PATH + getTaskName());
         scheduledTaskService.start(PERIOD, this);
-        scheduledTaskService.start(CONTEXT_PERIOD, context);
     }
 
     /**
@@ -86,7 +115,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     @Override
     public void run() {
-        if (running || getQueue().size() > 500) {
+        if (running || getQueue().size() > (MAX_COUNT_SIZE - SUBMIT_COUNT)) {
             return;
         }
 
@@ -96,6 +125,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             locked = binlogLock.tryLock();
             if (locked) {
                 running = true;
+                flushMessage();
                 doParse();
             }
         } catch (Exception e) {
@@ -109,28 +139,73 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     }
 
     @Override
-    public void flush(BinlogMessage message) throws IOException {
-        context.write(message);
+    public void flush(BinlogMessage message) {
+        queue.offer(message);
     }
 
     @Override
-    public void destroy() {
-        context.close();
+    public void destroy() throws IOException {
+        shard.close();
     }
 
     private void doParse() throws IOException {
-        byte[] line;
-        AtomicInteger batchCounter = new AtomicInteger();
-        while (batchCounter.get() < MAX_BATCH_COUNT && null != (line = context.readLine())) {
-            Message message = deserialize(BinlogMessage.parseFrom(line));
-            if (null != message) {
-                getQueue().offer(message);
+        Option option = new Option(new MatchAllDocsQuery());
+        option.addIndexFieldResolverEnum(BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
+        Paging paging = shard.query(option, 1, SUBMIT_COUNT, null);
+        if (CollectionUtils.isEmpty(paging.getData())) {
+            return;
+        }
+
+        List<Map> maps = (List<Map>) paging.getData();
+        List<Term> tasks = new ArrayList<>();
+        maps.forEach(m -> {
+            try {
+                BytesRef ref = (BytesRef) m.get(BINLOG_CONTENT);
+                Message message = deserialize(BinlogMessage.parseFrom(ref.bytes));
+                if (null != message) {
+                    getQueue().offer(message);
+                }
+                tasks.add(new Term(BINLOG_ID, (String) m.get(BINLOG_ID)));
+            } catch (InvalidProtocolBufferException e) {
+                logger.error(e.getMessage());
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        });
+
+        if(!CollectionUtils.isEmpty(tasks)){
+            tasks.forEach(t -> {
+                try {
+                    // TODO 批量删除
+                    shard.delete(t);
+                } catch (IOException e) {
+                    logger.error(e.getMessage());
+                }
+            });
+        }
+    }
+
+    private void flushMessage() throws IOException {
+        if(queue.isEmpty()){
+            return;
+        }
+
+        List<Document> tasks = new ArrayList<>();
+        AtomicLong batchCounter = new AtomicLong();
+        Document doc;
+        while (!queue.isEmpty() && batchCounter.get() < SUBMIT_COUNT) {
+            BinlogMessage message = queue.poll();
+            if(null != message){
+                doc = new Document();
+                doc.add(new StringField(BINLOG_ID, String.valueOf(snowflakeIdWorker.nextId()), Field.Store.YES));
+                doc.add(new StoredField(BINLOG_CONTENT, new BytesRef(message.toByteArray())));
+                tasks.add(doc);
             }
-            batchCounter.getAndAdd(1);
+            batchCounter.incrementAndGet();
         }
 
-        if (batchCounter.get() > 0) {
-            context.flush();
+        if(!CollectionUtils.isEmpty(tasks)){
+            shard.insertBatch(tasks);
         }
     }
 
@@ -138,10 +213,10 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
      * Java语言提供了八种基本类型,六种数字类型(四个整数型,两个浮点型),一种字符类型,一种布尔型。
      * <p>
      * <ol>
-     *     <li>整数:包括int,short,byte,long</li>
-     *     <li>浮点型:float,double</li>
-     *     <li>字符:char</li>
-     *     <li>布尔:boolean</li>
+     * <li>整数:包括int,short,byte,long</li>
+     * <li>浮点型:float,double</li>
+     * <li>字符:char</li>
+     * <li>布尔:boolean</li>
      * </ol>
      *
      * <pre>
@@ -164,7 +239,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         switch (type) {
             // 字节
             case "[B":
-            return ByteString.copyFrom((byte[]) v);
+                return ByteString.copyFrom((byte[]) v);
 
             // 字符串
             case "java.lang.String":