소스 검색

fixed unsafe list,NPE

穿云 3 달 전
부모
커밋
3aae32ad00
15개의 변경된 파일50개의 추가작업 그리고 53개의 파일을 삭제
  1. 2 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java
  2. 10 12
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java
  3. 2 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  4. 3 4
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java
  5. 2 1
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java
  6. 4 4
      dbsyncer-connector/dbsyncer-connector-file/src/main/java/org/dbsyncer/connector/file/model/FileResolver.java
  7. 8 6
      dbsyncer-connector/dbsyncer-connector-kafka/src/main/java/org/dbsyncer/connector/kafka/serialization/JsonToMapDeserializer.java
  8. 2 2
      dbsyncer-connector/dbsyncer-connector-postgresql/src/main/java/org/dbsyncer/connector/postgresql/decoder/impl/PgOutputMessageDecoder.java
  9. 3 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  10. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Meta.java
  11. 2 2
      dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java
  12. 3 3
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/DatabaseConfig.java
  13. 2 2
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/AbstractConnector.java
  14. 2 2
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/schema/AbstractSchemaResolver.java
  15. 3 3
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java

@@ -22,9 +22,9 @@ import org.springframework.util.Assert;
 import javax.annotation.Resource;
 import java.time.Instant;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author AE86
@@ -74,7 +74,7 @@ public abstract class AbstractChecker implements Checker {
     protected void modifySuperConfigModel(AbstractConfigModel model, Map<String, String> params) {
         // 全局参数
         String mappingParams = params.get("params");
-        model.setParams(StringUtil.isNotBlank(mappingParams) ? JsonUtil.jsonToObj(mappingParams, Map.class) : new LinkedHashMap());
+        model.setParams(StringUtil.isNotBlank(mappingParams) ? JsonUtil.jsonToObj(mappingParams, Map.class) : new ConcurrentHashMap<>());
 
         // 过滤条件
         String filterJson = params.get("filter");

+ 10 - 12
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -16,8 +16,8 @@ import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.constant.ConfigConstant;
+import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -25,13 +25,12 @@ import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -44,7 +43,7 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private final Map<String, Boolean> health = new LinkedHashMap<>();
+    private final Map<String, Boolean> health = new ConcurrentHashMap<>();
 
     @Resource
     private ProfileComponent profileComponent;
@@ -117,17 +116,16 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
 
     @Override
     public List<Connector> getConnectorAll() {
-        List<Connector> list = profileComponent.getConnectorAll()
+        return profileComponent.getConnectorAll()
                 .stream()
                 .sorted(Comparator.comparing(Connector::getUpdateTime).reversed())
                 .collect(Collectors.toList());
-        return list;
     }
 
     @Override
     public List<String> getConnectorTypeAll() {
         ArrayList<String> connectorTypes = new ArrayList<>(connectorFactory.getConnectorTypeAll());
-        Collections.sort(connectorTypes, Comparator.comparing(String::toString));
+        connectorTypes.sort(Comparator.comparing(String::toString));
         return connectorTypes;
     }
 
@@ -150,14 +148,14 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
 
         // 移除删除的连接器
         Set<String> remove = new HashSet<>();
-        health.keySet().forEach(k -> {
-            if (!exist.contains(k)) {
-                remove.add(k);
+        for (Map.Entry<String, Boolean> entry : health.entrySet()) {
+            if (!exist.contains(entry.getKey())) {
+                remove.add(entry.getKey());
             }
-        });
+        }
 
         if (!CollectionUtils.isEmpty(remove)) {
-            remove.forEach(k -> health.remove(k));
+            remove.forEach(health::remove);
         }
     }
 

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -39,9 +39,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -183,7 +183,7 @@ public class DataSyncServiceImpl implements DataSyncService {
 
     private Map getData(String metaId, String messageId) {
         Query query = new Query(1, 1);
-        Map<String, FieldResolver> fieldResolvers = new LinkedHashMap<>();
+        Map<String, FieldResolver> fieldResolvers = new ConcurrentHashMap<>();
         fieldResolvers.put(ConfigConstant.BINLOG_DATA, (FieldResolver<IndexableField>) field -> field.binaryValue().bytes);
         query.setFieldResolverMap(fieldResolvers);
         query.addFilter(ConfigConstant.CONFIG_MODEL_ID, messageId);

+ 3 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -14,7 +14,6 @@ import org.dbsyncer.biz.metric.MetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.CpuMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.DiskMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.DoubleRoundMetricDetailFormatter;
-import org.dbsyncer.biz.metric.impl.GCMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.MemoryMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.ValueMetricDetailFormatter;
 import org.dbsyncer.biz.model.AppReportMetric;
@@ -59,9 +58,9 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -95,7 +94,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
     @Resource
     private SystemConfigService systemConfigService;
 
-    private Map<String, MetricDetailFormatter> metricDetailFormatterMap = new LinkedHashMap<>();
+    private Map<String, MetricDetailFormatter> metricDetailFormatterMap = new ConcurrentHashMap<>();
 
     @PostConstruct
     private void init() {
@@ -262,7 +261,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
             return new Paging(pageNum, pageSize);
         }
         Query query = new Query(pageNum, pageSize);
-        Map<String, FieldResolver> fieldResolvers = new LinkedHashMap<>();
+        Map<String, FieldResolver> fieldResolvers = new ConcurrentHashMap<>();
         fieldResolvers.put(ConfigConstant.BINLOG_DATA, (FieldResolver<IndexableField>) field -> field.binaryValue().bytes);
         query.setFieldResolverMap(fieldResolvers);
 

+ 2 - 1
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.sql.Types;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -82,7 +83,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
     public static final String _SOURCE_INDEX = "_source_index";
     private final String _TARGET_INDEX = "_target_index";
     private final String _TYPE = "_type";
-    private final Map<String, FilterMapper> filters = new LinkedHashMap<>();
+    private final Map<String, FilterMapper> filters = new ConcurrentHashMap<>();
     private final ESConfigValidator configValidator = new ESConfigValidator();
 
     public ElasticsearchConnector() {

+ 4 - 4
dbsyncer-connector/dbsyncer-connector-file/src/main/java/org/dbsyncer/connector/file/model/FileResolver.java

@@ -9,9 +9,9 @@ import org.dbsyncer.connector.file.column.impl.FileColumnValue;
 import org.dbsyncer.sdk.model.Field;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @Author AE86
@@ -20,11 +20,11 @@ import java.util.Map;
  */
 public class FileResolver {
 
-    private ColumnValue value = new FileColumnValue();
+    private final ColumnValue value = new FileColumnValue();
 
     public Map<String, Object> parseMap(List<Field> fields, char separator, String line) {
-        Map<String, Object> row = new LinkedHashMap<>();
-        parse(fields, separator, line, (key, value) -> row.put(key, value));
+        Map<String, Object> row = new ConcurrentHashMap<>();
+        parse(fields, separator, line, row::put);
         return row;
     }
 

+ 8 - 6
dbsyncer-connector/dbsyncer-connector-kafka/src/main/java/org/dbsyncer/connector/kafka/serialization/JsonToMapDeserializer.java

@@ -8,8 +8,8 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.dbsyncer.common.util.JsonUtil;
 
 import java.io.UnsupportedEncodingException;
-import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @Author AE86
@@ -23,19 +23,21 @@ public class JsonToMapDeserializer implements Deserializer<Map> {
     public void configure(Map<String, ?> configs, boolean isKey) {
         String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
         Object encodingValue = configs.get(propertyName);
-        if (encodingValue == null)
+        if (encodingValue == null) {
             encodingValue = configs.get("deserializer.encoding");
-        if (encodingValue != null && encodingValue instanceof String)
+        }
+        if (encodingValue != null && encodingValue instanceof String) {
             encoding = (String) encodingValue;
+        }
     }
 
     @Override
     public Map deserialize(String topic, byte[] data) {
         try {
-            if (data == null)
+            if (data == null) {
                 return null;
-            else
-                return JsonUtil.jsonToObj(new String(data, encoding), LinkedHashMap.class);
+            }
+            return JsonUtil.jsonToObj(new String(data, encoding), ConcurrentHashMap.class);
         } catch (UnsupportedEncodingException e) {
             throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
         }

+ 2 - 2
dbsyncer-connector/dbsyncer-connector-postgresql/src/main/java/org/dbsyncer/connector/postgresql/decoder/impl/PgOutputMessageDecoder.java

@@ -21,9 +21,9 @@ import org.springframework.util.Assert;
 import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @Author AE86
@@ -36,7 +36,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
 
     private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
     private static final String GET_TABLE_SCHEMA = "select t.oid,t.relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = '%s') as n on n.nspoid = t.relnamespace where relkind = 'r'";
-    private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
+    private static final Map<Integer, TableId> tables = new ConcurrentHashMap<>();
     private ConnectorService connectorService;
     private DatabaseConnectorInstance connectorInstance;
 

+ 3 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -17,10 +17,10 @@ import org.springframework.util.Assert;
 import javax.annotation.Resource;
 import java.lang.reflect.ParameterizedType;
 import java.time.Instant;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -207,14 +207,11 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
         }
 
         AtomicLong batchCounter = new AtomicLong();
-        Map<String, Response> map = new LinkedHashMap<>();
+        Map<String, Response> map = new ConcurrentHashMap<>();
         while (!queue.isEmpty() && batchCounter.get() < config.getBufferPullCount()) {
             Request poll = queue.poll();
             String key = getPartitionKey(poll);
-            if (!map.containsKey(key)) {
-                map.putIfAbsent(key, responseClazz.newInstance());
-            }
-            Response response = map.get(key);
+            Response response = map.putIfAbsent(key, responseClazz.newInstance());
             partition(poll, response);
             batchCounter.incrementAndGet();
 

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Meta.java

@@ -3,8 +3,8 @@ package org.dbsyncer.parser.model;
 import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 
-import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -50,7 +50,7 @@ public class Meta extends ConfigModel {
         this.total = new AtomicLong(0);
         this.success = new AtomicLong(0);
         this.fail = new AtomicLong(0);
-        this.snapshot = new LinkedHashMap<>();
+        this.snapshot = new ConcurrentHashMap<>();
         this.beginTime = 0L;
         this.endTime = 0L;
     }

+ 2 - 2
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -24,11 +24,11 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -55,7 +55,7 @@ public class PluginFactory implements DisposableBean {
 
     private final List<Plugin> plugins = new LinkedList<>();
 
-    private final Map<String, PluginService> service = new LinkedHashMap<>();
+    private final Map<String, PluginService> service = new ConcurrentHashMap<>();
 
     @Resource
     private ApplicationContext applicationContext;

+ 3 - 3
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/DatabaseConfig.java

@@ -6,9 +6,9 @@ package org.dbsyncer.sdk.config;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.SqlTable;
 
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author AE86
@@ -61,14 +61,14 @@ public class DatabaseConfig extends ConnectorConfig {
     /**
      * 参数配置
      */
-    private Map<String, String> properties = new LinkedHashMap<>();
+    private Map<String, String> properties = new ConcurrentHashMap<>();
 
     public String getProperty(String key) {
         return properties.get(key);
     }
 
     public String getProperty(String key, String defaultValue) {
-        return properties.containsKey(key) ? properties.get(key) : defaultValue;
+        return properties.getOrDefault(key, defaultValue);
     }
 
     public String getDriverClassName() {

+ 2 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/AbstractConnector.java

@@ -40,14 +40,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
-import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public abstract class AbstractConnector {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected final Map<Integer, ValueMapper> VALUE_MAPPERS = new LinkedHashMap<>();
+    protected final Map<Integer, ValueMapper> VALUE_MAPPERS = new ConcurrentHashMap<>();
 
     public AbstractConnector() {
         // 常用类型

+ 2 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/schema/AbstractSchemaResolver.java

@@ -7,8 +7,8 @@ import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.model.Field;
 import org.springframework.util.Assert;
 
-import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @Author 穿云
@@ -17,7 +17,7 @@ import java.util.Map;
  */
 public abstract class AbstractSchemaResolver implements SchemaResolver {
 
-    private final Map<String, DataType> mapping = new LinkedHashMap<>();
+    private final Map<String, DataType> mapping = new ConcurrentHashMap<>();
 
     public AbstractSchemaResolver() {
         initDataTypeMapping(mapping);

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -35,9 +35,9 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @Author AE86
@@ -188,7 +188,7 @@ public class Shard {
             // 取得对应的文档对象
             doc = searcher.doc(docs[begin++].doc);
             iterator = doc.iterator();
-            r = new LinkedHashMap<>();
+            r = new ConcurrentHashMap();
             while (iterator.hasNext()) {
                 f = iterator.next();
 
@@ -204,7 +204,7 @@ public class Shard {
                             continue;
                         }
                     } catch (InvalidTokenOffsetsException e) {
-                        e.printStackTrace();
+                        logger.error(e.getLocalizedMessage(), e);
                     }
                 }