Browse Source

fix batch

AE86 2 years ago
parent
commit
923754ea9a

+ 7 - 18
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -156,33 +156,22 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             return;
         }
 
-        List<Map> maps = (List<Map>) paging.getData();
-        List<Term> tasks = new ArrayList<>();
-        maps.forEach(m -> {
+        List<Map> list = (List<Map>) paging.getData();
+        int size = list.size();
+        Term[] terms = new Term[size];
+        for (int i = 0; i < size; i++) {
             try {
-                BytesRef ref = (BytesRef) m.get(BINLOG_CONTENT);
+                BytesRef ref = (BytesRef) list.get(i).get(BINLOG_CONTENT);
                 Message message = deserialize(BinlogMessage.parseFrom(ref.bytes));
                 if (null != message) {
                     getQueue().offer(message);
                 }
-                tasks.add(new Term(BINLOG_ID, (String) m.get(BINLOG_ID)));
+                terms[i] = new Term(BINLOG_ID, (String) list.get(i).get(BINLOG_ID));
             } catch (InvalidProtocolBufferException e) {
                 logger.error(e.getMessage());
-            } catch (IOException e) {
-                logger.error(e.getMessage());
             }
-        });
-
-        if(!CollectionUtils.isEmpty(tasks)){
-            tasks.forEach(t -> {
-                try {
-                    // TODO 批量删除
-                    shard.delete(t);
-                } catch (IOException e) {
-                    logger.error(e.getMessage());
-                }
-            });
         }
+        shard.deleteBatch(terms);
     }
 
     private void flushMessage() throws IOException {

+ 6 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -83,6 +83,12 @@ public class Shard {
         }
     }
 
+    public void deleteBatch(Term... terms) throws IOException {
+        if (null != terms) {
+            execute(terms, () -> indexWriter.deleteDocuments(terms));
+        }
+    }
+
     public void deleteAll() throws IOException {
         // Fix Bug: this IndexReader is closed. 直接删除文件
         close();