Browse Source

优化shard

AE86 3 năm trước cách đây
mục cha
commit
33e199f4fa

+ 5 - 9
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -234,7 +234,7 @@ public class MysqlExtractor extends AbstractExtractor {
 
 
             if (EventType.isUpdate(header.getEventType())) {
             if (EventType.isUpdate(header.getEventType())) {
                 UpdateRowsEventData data = event.getData();
                 UpdateRowsEventData data = event.getData();
-                if (isFilterTable(data.getTableId(), ConnectorConstant.OPERTION_UPDATE)) {
+                if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m.getKey()).collect(Collectors.toList());
                         List<Object> before = Stream.of(m.getKey()).collect(Collectors.toList());
                         List<Object> after = Stream.of(m.getValue()).collect(Collectors.toList());
                         List<Object> after = Stream.of(m.getValue()).collect(Collectors.toList());
@@ -246,7 +246,7 @@ public class MysqlExtractor extends AbstractExtractor {
             }
             }
             if (EventType.isWrite(header.getEventType())) {
             if (EventType.isWrite(header.getEventType())) {
                 WriteRowsEventData data = event.getData();
                 WriteRowsEventData data = event.getData();
-                if (isFilterTable(data.getTableId(), ConnectorConstant.OPERTION_INSERT)) {
+                if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m).collect(Collectors.toList());
                         List<Object> after = Stream.of(m).collect(Collectors.toList());
                         asyncSendRowChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
                         asyncSendRowChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
@@ -257,7 +257,7 @@ public class MysqlExtractor extends AbstractExtractor {
             }
             }
             if (EventType.isDelete(header.getEventType())) {
             if (EventType.isDelete(header.getEventType())) {
                 DeleteRowsEventData data = event.getData();
                 DeleteRowsEventData data = event.getData();
-                if (isFilterTable(data.getTableId(), ConnectorConstant.OPERTION_DELETE)) {
+                if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m).collect(Collectors.toList());
                         List<Object> before = Stream.of(m).collect(Collectors.toList());
                         asyncSendRowChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
                         asyncSendRowChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
@@ -280,13 +280,9 @@ public class MysqlExtractor extends AbstractExtractor {
             return tables.get(tableId).getTable();
             return tables.get(tableId).getTable();
         }
         }
 
 
-        private boolean isFilterTable(long tableId, String event) {
+        private boolean isFilterTable(long tableId) {
             final TableMapEventData tableMap = tables.get(tableId);
             final TableMapEventData tableMap = tables.get(tableId);
-            if (!StringUtil.equals(database, tableMap.getDatabase()) || !filterTable.contains(tableMap.getTable())) {
-                logger.info("Table[{}.{}] {}", tableMap.getDatabase(), tableMap.getTable(), event);
-                return false;
-            }
-            return true;
+            return StringUtil.equals(database, tableMap.getDatabase()) && filterTable.contains(tableMap.getTable());
         }
         }
 
 
     }
     }

+ 75 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -18,9 +18,10 @@ import org.springframework.stereotype.Component;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 import java.time.Instant;
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
@@ -47,6 +48,17 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob, Disposa
     @Autowired
     @Autowired
     private ScheduledTaskService scheduledTaskService;
     private ScheduledTaskService scheduledTaskService;
 
 
+    @Autowired
+    private Executor taskExecutor;
+
+    private Queue<Task> buffer = new ConcurrentLinkedQueue();
+
+    private Queue<Task> temp = new ConcurrentLinkedQueue();
+
+    private final Object LOCK = new Object();
+
+    private volatile boolean running;
+
     private String key;
     private String key;
 
 
     @PostConstruct
     @PostConstruct
@@ -87,14 +99,59 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob, Disposa
             added.set(true);
             added.set(true);
             return params;
             return params;
         }).collect(Collectors.toList());
         }).collect(Collectors.toList());
-        storageService.addData(StorageEnum.DATA, metaId, list);
-    }
 
 
+        if (running) {
+            temp.offer(new Task(metaId, list));
+            return;
+        }
+
+        buffer.offer(new Task(metaId, list));
+    }
 
 
     @Override
     @Override
     public void run() {
     public void run() {
-        // TODO 批量写入同步数据
-        logger.info("run flush task");
+        if (running) {
+            return;
+        }
+        synchronized (LOCK) {
+            if (running) {
+                return;
+            }
+            running = true;
+            flush(buffer);
+            running = false;
+            try {
+                TimeUnit.MILLISECONDS.sleep(10);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage());
+            }
+            flush(temp);
+        }
+    }
+
+    private void flush(Queue<Task> buffer) {
+        if (!buffer.isEmpty()) {
+            final Map<String, List<Map>> task = new LinkedHashMap<>();
+            while (!buffer.isEmpty()) {
+                Task t = buffer.poll();
+                if (!task.containsKey(t.metaId)) {
+                    task.putIfAbsent(t.metaId, new LinkedList<>());
+                }
+                task.get(t.metaId).addAll(t.list);
+            }
+            task.forEach((metaId, list) -> {
+                taskExecutor.execute(() -> {
+                    long now = Instant.now().toEpochMilli();
+                    try {
+                        storageService.addData(StorageEnum.DATA, metaId, list);
+                    } catch (Exception e) {
+                        logger.error("[{}]-flush异常{}", metaId, list.size());
+                    }
+                    logger.info("[{}]-flush{}条,耗时{}秒", metaId, list.size(), (Instant.now().toEpochMilli() - now) / 1000);
+                });
+            });
+            task.clear();
+        }
     }
     }
 
 
     @Override
     @Override
@@ -102,4 +159,15 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob, Disposa
         scheduledTaskService.stop(key);
         scheduledTaskService.stop(key);
         logger.info("Stopped scheduled task.");
         logger.info("Stopped scheduled task.");
     }
     }
+
+    final class Task {
+        String metaId;
+        List<Map> list;
+
+        public Task(String metaId, List<Map> list) {
+            this.metaId = metaId;
+            this.list = list;
+        }
+    }
+
 }
 }

+ 8 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -42,7 +42,7 @@ public class Shard {
 
 
     private IndexWriterConfig config;
     private IndexWriterConfig config;
 
 
-    private final Object lock = new Object();
+    private final Object LOCK = new Object();
 
 
     private static final int MAX_SIZE = 10000;
     private static final int MAX_SIZE = 10000;
 
 
@@ -101,7 +101,7 @@ public class Shard {
         if (null != changeReader) {
         if (null != changeReader) {
             indexReader.close();
             indexReader.close();
             indexReader = null;
             indexReader = null;
-            synchronized (lock) {
+            synchronized (LOCK) {
                 if (null == indexReader) {
                 if (null == indexReader) {
                     indexReader = changeReader;
                     indexReader = changeReader;
                 }
                 }
@@ -192,10 +192,12 @@ public class Shard {
 
 
     private void execute(Object value, Callback callback) throws IOException {
     private void execute(Object value, Callback callback) throws IOException {
         if (null != value) {
         if (null != value) {
-            if (indexWriter.isOpen()) {
-                callback.execute();
-                indexWriter.commit();
-                return;
+            synchronized (LOCK) {
+                if (indexWriter.isOpen()) {
+                    callback.execute();
+                    indexWriter.commit();
+                    return;
+                }
             }
             }
             logger.error(value.toString());
             logger.error(value.toString());
         }
         }