Răsfoiți Sursa

fixedbug parallelStream 并发下可能导致字段缺失

AE86 3 ani în urmă
părinte
comite
244cd0eb8b

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -302,7 +302,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             // 处理过程有异常向上抛
             List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
-                pickers.parallelStream().forEach(picker -> {
+                pickers.forEach(picker -> {
                     final Map<String, Object> before = picker.getColumns(rowChangedEvent.getBeforeData());
                     final Map<String, Object> after = picker.getColumns(rowChangedEvent.getAfterData());
                     if (picker.filter(StringUtil.equals(ConnectorConstant.OPERTION_DELETE, rowChangedEvent.getEvent()) ? before : after)) {

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -60,7 +60,7 @@ public class FlushServiceImpl implements FlushService {
     @Override
     public void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
-        List<Map> list = data.parallelStream().map(r -> {
+        List<Map> list = data.stream().map(r -> {
             Map<String, Object> params = new HashMap();
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());

+ 1 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/ConvertUtil.java

@@ -20,10 +20,7 @@ public abstract class ConvertUtil {
      */
     public static void convert(List<Convert> convert, List<Map> data) {
         if (!CollectionUtils.isEmpty(convert) && !CollectionUtils.isEmpty(data)) {
-            // 并行流计算
-            data.parallelStream().forEach(row -> {
-                convert(convert, row);
-            });
+            data.forEach(row -> convert(convert, row));
         }
     }
 

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -135,7 +135,7 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     @Override
     public void insertData(StorageEnum type, String collection, List<Map> list) throws IOException {
         createShardIfNotExist(collection);
-        List<Document> docs = list.parallelStream().map(r -> ParamsUtil.convertData2Doc(r)).collect(Collectors.toList());
+        List<Document> docs = list.stream().map(r -> ParamsUtil.convertData2Doc(r)).collect(Collectors.toList());
         map.get(collection).insertBatch(docs);
     }
 

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -374,13 +374,13 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         // 开启高亮
         if (!CollectionUtils.isEmpty(list) && query.isEnableHighLightSearch()) {
             List<Param> highLight = query.getParams().stream().filter(p -> p.isHighlighter()).collect(Collectors.toList());
-            list.parallelStream().forEach(row -> {
+            list.forEach(row ->
                 highLight.forEach(p -> {
                     String text = String.valueOf(row.get(p.getKey()));
                     String replacement = new StringBuilder("<span style='color:red'>").append(p.getValue()).append("</span>").toString();
                     row.put(p.getKey(), StringUtil.replace(text, p.getValue(), replacement));
-                });
-            });
+                })
+            );
         }
     }