AE86 2 tahun lalu
induk
melakukan
2b6324f7b9

+ 20 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/IndexFieldResolverEnum.java

@@ -0,0 +1,20 @@
+package org.dbsyncer.storage.enums;
+
+import org.dbsyncer.storage.lucene.IndexFieldResolver;
+
+public enum IndexFieldResolverEnum {
+
+    STRING((f) -> f.stringValue()),
+
+    BINARY((f) -> f.binaryValue());
+
+    private IndexFieldResolver indexFieldResolver;
+
+    IndexFieldResolverEnum(IndexFieldResolver indexFieldResolver) {
+        this.indexFieldResolver = indexFieldResolver;
+    }
+
+    public IndexFieldResolver getIndexFieldResolver() {
+        return indexFieldResolver;
+    }
+}

+ 9 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/IndexFieldResolver.java

@@ -0,0 +1,9 @@
+package org.dbsyncer.storage.lucene;
+
+import org.apache.lucene.index.IndexableField;
+
+public interface IndexFieldResolver {
+
+    Object getValue(IndexableField field);
+
+}

+ 2 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -181,7 +181,8 @@ public class Shard {
                     }
                 }
 
-                r.put(f.name(), f.stringValue());
+                // 解析value类型
+                r.put(f.name(), option.getFieldResolver(f.name()).getValue(f));
             }
             list.add(r);
         }

+ 22 - 5
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Option.java

@@ -5,8 +5,12 @@ import org.apache.lucene.search.highlight.Highlighter;
 import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
+import org.dbsyncer.storage.lucene.IndexFieldResolver;
 
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -17,13 +21,15 @@ import java.util.stream.Collectors;
  */
 public class Option {
 
-    private Query       query;
+    private Query query;
+
     private Set<String> highLightKeys;
-    private boolean     enableHighLightSearch;
+
+    private boolean enableHighLightSearch;
+
     private Highlighter highlighter = null;
 
-    public Option() {
-    }
+    private Map<String, IndexFieldResolverEnum> fieldResolvers = new LinkedHashMap<>();
 
     public Option(Query query) {
         this.query = query;
@@ -38,12 +44,23 @@ public class Option {
                     .collect(Collectors.toSet());
         }
         if (!CollectionUtils.isEmpty(highLightKeys)) {
-            enableHighLightSearch = true;
+            this.enableHighLightSearch = true;
             SimpleHTMLFormatter formatter = new SimpleHTMLFormatter("<span style='color:red'>", "</span>");
             highlighter = new Highlighter(formatter, new QueryScorer(query));
         }
     }
 
+    public IndexFieldResolver getFieldResolver(String name){
+        if(fieldResolvers.containsKey(name)){
+            return fieldResolvers.get(name).getIndexFieldResolver();
+        }
+        return IndexFieldResolverEnum.STRING.getIndexFieldResolver();
+    }
+
+    public void addIndexFieldResolverEnum(String name, IndexFieldResolverEnum fieldResolver){
+        fieldResolvers.putIfAbsent(name, fieldResolver);
+    }
+
     public Query getQuery() {
         return query;
     }

+ 27 - 31
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -29,7 +29,7 @@ import java.util.Queue;
  * @version 1.0.0
  * @date 2022/6/18 23:46
  */
-public class BinlogMessageTest {
+public class BinlogMessageTest extends AbstractBinlogRecorder {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -37,8 +37,6 @@ public class BinlogMessageTest {
 
     private BinlogColumnValue value = new BinlogColumnValue();
 
-    private MessageTest messageTest = new MessageTest();
-
     @Before
     public void init() throws IOException {
         context = new BinlogContext("WriterBinlog");
@@ -94,7 +92,7 @@ public class BinlogMessageTest {
         BinlogMap.Builder builder = BinlogMap.newBuilder();
         data.forEach((k, v) -> {
             if (null != v) {
-                ByteString bytes = messageTest.serializeValue(v);
+                ByteString bytes = serializeValue(v);
                 if (null != bytes) {
                     builder.putRow(k, bytes);
                 }
@@ -117,7 +115,7 @@ public class BinlogMessageTest {
         // short
         short s = 32767;
         logger.info("short1:{}", s);
-        ByteString shortBytes = messageTest.serializeValue(s);
+        ByteString shortBytes = serializeValue(s);
         logger.info("bytes:{}", shortBytes.toByteArray());
         value.setValue(shortBytes);
         short s2 = value.asShort();
@@ -126,7 +124,7 @@ public class BinlogMessageTest {
         // int
         int i = 1999999999;
         logger.info("int1:{}", i);
-        ByteString intBytes = messageTest.serializeValue(i);
+        ByteString intBytes = serializeValue(i);
         logger.info("bytes:{}", intBytes.toByteArray());
         value.setValue(intBytes);
         int i2 = value.asInteger();
@@ -135,7 +133,7 @@ public class BinlogMessageTest {
         // long
         long l = 8999999999999999999L;
         logger.info("long1:{}", l);
-        ByteString longBytes = messageTest.serializeValue(l);
+        ByteString longBytes = serializeValue(l);
         logger.info("bytes:{}", longBytes.toByteArray());
         value.setValue(longBytes);
         long l2 = value.asLong();
@@ -144,7 +142,7 @@ public class BinlogMessageTest {
         // float
         float f = 99999999999999999999999999999999999.99999999999999999999999999999999999f;
         logger.info("float1:{}", f);
-        ByteString floatBytes = messageTest.serializeValue(f);
+        ByteString floatBytes = serializeValue(f);
         logger.info("bytes:{}", floatBytes.toByteArray());
         value.setValue(floatBytes);
         float f2 = value.asFloat();
@@ -153,7 +151,7 @@ public class BinlogMessageTest {
         // double
         double d = 999999.9999999999999999999999999d;
         logger.info("double1:{}", d);
-        ByteString doubleBytes = messageTest.serializeValue(d);
+        ByteString doubleBytes = serializeValue(d);
         logger.info("bytes:{}", doubleBytes.toByteArray());
         value.setValue(doubleBytes);
         double d2 = value.asDouble();
@@ -162,7 +160,7 @@ public class BinlogMessageTest {
         // double
         BigDecimal b = new BigDecimal(8888888.888888888888888f);
         logger.info("bigDecimal1:{}", b);
-        ByteString bigDecimalBytes = messageTest.serializeValue(b);
+        ByteString bigDecimalBytes = serializeValue(b);
         logger.info("bytes:{}", bigDecimalBytes.toByteArray());
         value.setValue(bigDecimalBytes);
         BigDecimal b2 = value.asBigDecimal();
@@ -171,7 +169,7 @@ public class BinlogMessageTest {
         // boolean
         boolean bool = true;
         logger.info("bool1:{}", bool);
-        ByteString boolBytes = messageTest.serializeValue(bool);
+        ByteString boolBytes = serializeValue(bool);
         logger.info("bytes:{}", boolBytes.toByteArray());
         value.setValue(boolBytes);
         Boolean bool2 = value.asBoolean();
@@ -183,7 +181,7 @@ public class BinlogMessageTest {
         // timestamp
         Timestamp timestamp = Timestamp.valueOf(LocalDateTime.now());
         logger.info("timestamp1:{}, l:{}", timestamp, timestamp.getTime());
-        ByteString timestampBytes = messageTest.serializeValue(timestamp);
+        ByteString timestampBytes = serializeValue(timestamp);
         logger.info("bytes:{}", timestampBytes.toByteArray());
         value.setValue(timestampBytes);
         Timestamp timestamp2 = value.asTimestamp();
@@ -192,7 +190,7 @@ public class BinlogMessageTest {
         // date
         Date date = new Date(timestamp.getTime());
         logger.info("date1:{}, l:{}", date, date.getTime());
-        ByteString dateBytes = messageTest.serializeValue(date);
+        ByteString dateBytes = serializeValue(date);
         logger.info("bytes:{}", dateBytes.toByteArray());
         value.setValue(dateBytes);
         Date date2 = value.asDate();
@@ -201,33 +199,31 @@ public class BinlogMessageTest {
         // time
         Time time = new Time(timestamp.getTime());
         logger.info("time1:{}, l:{}", time, time.getTime());
-        ByteString timeBytes = messageTest.serializeValue(time);
+        ByteString timeBytes = serializeValue(time);
         logger.info("bytes:{}", timeBytes.toByteArray());
         value.setValue(timeBytes);
         Time time2 = value.asTime();
         logger.info("time2:{}, l:{}", time2, time2.getTime());
     }
 
-    final class MessageTest extends AbstractBinlogRecorder {
-        @Override
-        protected Queue getQueue() {
-            return null;
-        }
+    @Override
+    protected Queue getQueue() {
+        return null;
+    }
 
-        @Override
-        protected Object deserialize(BinlogMessage message) {
-            return null;
-        }
+    @Override
+    protected Object deserialize(BinlogMessage message) {
+        return null;
+    }
 
-        @Override
-        protected Object resolveValue(int type, ByteString v) {
-            return super.resolveValue(type, v);
-        }
+    @Override
+    protected Object resolveValue(int type, ByteString v) {
+        return super.resolveValue(type, v);
+    }
 
-        @Override
-        protected ByteString serializeValue(Object v) {
-            return super.serializeValue(v);
-        }
+    @Override
+    protected ByteString serializeValue(Object v) {
+        return super.serializeValue(v);
     }
 
 }

+ 154 - 0
dbsyncer-storage/src/main/test/ShardBinlogTest.java

@@ -0,0 +1,154 @@
+import com.google.protobuf.ByteString;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.util.BytesRef;
+import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
+import org.dbsyncer.storage.binlog.proto.BinlogMap;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.binlog.proto.EventEnum;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
+import org.dbsyncer.storage.lucene.Shard;
+import org.dbsyncer.storage.query.Option;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.*;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/18 23:46
+ */
+public class ShardBinlogTest extends AbstractBinlogRecorder {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private Shard shard;
+
+    @Before
+    public void init() throws IOException {
+        shard = new Shard("target/indexDir/");
+    }
+
+    @After
+    public void close() throws IOException {
+        shard.deleteAll();
+    }
+
+    @Test
+    public void testBinlogMessage() throws IOException {
+        List<Document> list = new ArrayList<>();
+        for (int i = 1; i <= 10000; i++) {
+            BinlogMessage message = genMessage("123456", i + "");
+            Document doc = new Document();
+            doc.add(new StringField("id", String.valueOf(i), Field.Store.YES));
+            BytesRef bytesRef = new BytesRef(message.toByteArray());
+            doc.add(new StoredField("content", bytesRef));
+            list.add(doc);
+
+            if (i % 1000 == 0) {
+                shard.insertBatch(list);
+                list.clear();
+            }
+        }
+
+        if (!list.isEmpty()) {
+            shard.insertBatch(list);
+        }
+        check();
+
+        Option option = new Option(new MatchAllDocsQuery());
+        option.addIndexFieldResolverEnum("content", IndexFieldResolverEnum.BINARY);
+        Paging paging = shard.query(option, 1, 10001, null);
+        List<Map> maps = (List<Map>) paging.getData();
+        for (Map m : maps) {
+            BytesRef ref = (BytesRef) m.get("content");
+            BinlogMessage message = BinlogMessage.parseFrom(ref.bytes);
+            Map<String, ByteString> rowMap = message.getData().getRowMap();
+            logger.info(rowMap.get("name").toStringUtf8());
+        }
+        logger.info("总条数:{}", paging.getTotal());
+    }
+
+    private BinlogMessage genMessage(String tableGroupId, String key) {
+        Map<String, Object> data = new HashMap<>();
+        data.put("id", 1L);
+        data.put("name", key + "中文");
+        data.put("age", 88);
+        data.put("bd", new BigDecimal(88));
+        data.put("sex", 1);
+        data.put("f", 88.88f);
+        data.put("d", 999.99d);
+        data.put("b", true);
+        short ss = 32767;
+        data.put("ss", ss);
+        data.put("bytes", "中文666".getBytes(Charset.defaultCharset()));
+        data.put("create_date", new Date(Timestamp.valueOf(LocalDateTime.now()).getTime()));
+        data.put("update_time", Timestamp.valueOf(LocalDateTime.now()).getTime());
+
+        BinlogMap.Builder builder = BinlogMap.newBuilder();
+        data.forEach((k, v) -> {
+            if (null != v) {
+                ByteString bytes = serializeValue(v);
+                if (null != bytes) {
+                    builder.putRow(k, bytes);
+                }
+            }
+        });
+
+        BinlogMessage build = BinlogMessage.newBuilder()
+                .setTableGroupId(tableGroupId)
+                .setEvent(EventEnum.UPDATE)
+                .setData(builder.build())
+                .build();
+        return build;
+    }
+
+    @Override
+    protected Queue getQueue() {
+        return null;
+    }
+
+    @Override
+    protected Object deserialize(BinlogMessage message) {
+        return null;
+    }
+
+    @Override
+    protected Object resolveValue(int type, ByteString v) {
+        return super.resolveValue(type, v);
+    }
+
+    @Override
+    protected ByteString serializeValue(Object v) {
+        return super.serializeValue(v);
+    }
+
+    private void check() throws IOException {
+        final IndexSearcher searcher = shard.getSearcher();
+        IndexReader reader = searcher.getIndexReader();
+        // 通过reader可以有效的获取到文档的数量
+        // 有效的索引文档
+        System.out.println("有效的索引文档:" + reader.numDocs());
+        // 总共的索引文档
+        System.out.println("总共的索引文档:" + reader.maxDoc());
+        // 删掉的索引文档,其实不恰当,应该是在回收站里的索引文档
+        System.out.println("删掉的索引文档:" + reader.numDeletedDocs());
+    }
+
+}