Forráskód Böngészése

优化同步消息状态

AE86 2 éve
szülő
commit
700b36c8c3

+ 3 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -79,13 +79,12 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
         }
 
         // 1、获取配置信息
-        final String tableGroupId = message.getTableGroupId();
-        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
+        final TableGroup tableGroup = cacheService.get(message.getTableGroupId(), TableGroup.class);
 
         // 2、反序列数据
-        final Picker picker = new Picker(tableGroup.getFieldMapping());
-        final Map<String, Field> fieldMap = picker.getTargetFieldMap();
         try {
+            final Picker picker = new Picker(tableGroup.getFieldMapping());
+            final Map<String, Field> fieldMap = picker.getTargetFieldMap();
             Map<String, Object> data = new HashMap<>();
             message.getData().getRowMap().forEach((k, v) -> {
                 if (fieldMap.containsKey(k)) {

+ 19 - 27
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -207,43 +207,35 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             }
 
             List<Map> list = (List<Map>) paging.getData();
-            int size = list.size();
-            List<Message> messageList = new ArrayList<>();
-            List<Document> docs = new ArrayList<>();
-            List<String> deleteIds = new ArrayList<>();
-            long now = Instant.now().toEpochMilli();
-            Map row = null;
+            final int size = list.size();
+            final List<Message> messages = new ArrayList<>(size);
+            final List<Document> updateDocs = new ArrayList<>(size);
+            final Term[] deleteIds = new Term[size];
             for (int i = 0; i < size; i++) {
+                Map row = list.get(i);
+                String id = (String) row.get(BinlogConstant.BINLOG_ID);
+                Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
+                BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
+                if(BinlogConstant.PROCESSING == status){
+                    logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
+                }
+                deleteIds[i] = new Term(BinlogConstant.BINLOG_ID, id);
+                String newId = String.valueOf(snowflakeIdWorker.nextId());
                 try {
-                    row = list.get(i);
-                    String id = (String) row.get(BinlogConstant.BINLOG_ID);
-                    Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
-                    BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
-                    Message message = deserialize(id, BinlogMessage.parseFrom(ref.bytes));
+                    Message message = deserialize(newId, BinlogMessage.parseFrom(ref.bytes));
                     if (null != message) {
-                        messageList.add(message);
-                    }
-                    if(BinlogConstant.PROCESSING == status){
-                        logger.warn("建议优化参数配置,当前存在超时未处理数据,正在重试.");
-                        continue;
+                        messages.add(message);
+                        updateDocs.add(ParamsUtil.convertBinlog2Doc(newId, BinlogConstant.PROCESSING, ref, Instant.now().toEpochMilli()));
                     }
-                    deleteIds.add(id);
-                    docs.add(ParamsUtil.convertBinlog2Doc(String.valueOf(snowflakeIdWorker.nextId()), BinlogConstant.PROCESSING, ref, now));
                 } catch (InvalidProtocolBufferException e) {
                     logger.error(e.getMessage());
                 }
             }
 
             // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
-            shard.insertBatch(docs);
-            int deleteSize = deleteIds.size();
-            Term[] terms = new Term[deleteSize];
-            for (int i = 0; i < deleteSize; i++) {
-                terms[i] = new Term(BinlogConstant.BINLOG_ID, deleteIds.get(i));
-            }
-            shard.deleteBatch(terms);
-
-            getQueue().addAll(messageList);
+            shard.insertBatch(updateDocs);
+            shard.deleteBatch(deleteIds);
+            getQueue().addAll(messages);
         }
     }