AE86 4 years ago
parent
commit
294cfe13c8

+ 5 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -237,7 +237,9 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
 
     @Override
     public List<Map> queryData(Query query, String collectionId) {
-        return dataTemplate.query(StorageEnum.DATA, query, collectionId);
+        query.setType(StorageEnum.DATA.getType());
+        query.setCollection(collectionId);
+        return dataTemplate.query(query);
     }
 
     @Override
@@ -247,7 +249,8 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
 
     @Override
     public List<Map> queryLog(Query query) {
-        return dataTemplate.query(StorageEnum.LOG, query, null);
+        query.setType(StorageEnum.LOG.getType());
+        return dataTemplate.query(query);
     }
 
     @Override

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/DataTemplate.java

@@ -26,8 +26,8 @@ public final class DataTemplate {
     @Autowired
     private StorageService storageService;
 
-    public List<Map> query(StorageEnum type, Query query, String collectionId) {
-        return storageService.query(type, query, collectionId);
+    public List<Map> query(Query query) {
+        return storageService.query(query);
     }
 
     public void clear(StorageEnum type, String collectionId) {

+ 2 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/PreloadTemplate.java

@@ -54,9 +54,10 @@ public final class PreloadTemplate extends AbstractTemplate implements Applicati
 
     public void execute(PreloadConfig config) {
         Query query = new Query();
+        query.setType(StorageEnum.CONFIG.getType());
         String filterType = config.getFilterType();
         query.put(ConfigConstant.CONFIG_MODEL_TYPE, filterType);
-        List<Map> list = storageService.query(StorageEnum.CONFIG, query);
+        List<Map> list = storageService.query(query);
         boolean empty = CollectionUtils.isEmpty(list);
         logger.info("PreLoad {}:{}", filterType, empty ? 0 : list.size());
         if (!empty) {

+ 38 - 28
dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java

@@ -11,6 +11,7 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.util.Assert;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -26,31 +27,31 @@ public abstract class AbstractStorageService implements StorageService, Applicat
 
     private Map<String, Strategy> map;
 
-    public abstract List<Map> select(String collectionId, Query query) throws IOException;
+    public abstract List<Map> select(Query query) throws IOException;
 
-    public abstract void insert(String collectionId, Map params) throws IOException;
+    public abstract void insert(StorageEnum type, String collection, Map params) throws IOException;
 
-    public abstract void update(String collectionId, Map params) throws IOException;
+    public abstract void update(StorageEnum type, String collection, Map params) throws IOException;
 
-    public abstract void delete(String collectionId, String id) throws IOException;
+    public abstract void delete(StorageEnum type, String collection, String id) throws IOException;
 
-    public abstract void deleteAll(String collectionId) throws IOException;
+    public abstract void deleteAll(StorageEnum type, String collection) throws IOException;
 
     /**
      * 记录日志
      *
-     * @param collectionId
+     * @param collection
      * @param params
      */
-    public abstract void insertLog(String collectionId, Map<String, Object> params) throws IOException;
+    public abstract void insertLog(StorageEnum type, String collection, Map<String, Object> params) throws IOException;
 
     /**
      * 记录错误数据
      *
-     * @param collectionId
+     * @param collection
      * @param list
      */
-    public abstract void insertData(String collectionId, List<Map> list) throws IOException;
+    public abstract void insertData(StorageEnum type, String collection, List<Map> list) throws IOException;
 
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@@ -58,17 +59,13 @@ public abstract class AbstractStorageService implements StorageService, Applicat
     }
 
     @Override
-    public List<Map> query(StorageEnum type, Query query) {
-        return query(type, query, null);
-    }
-
-    @Override
-    public List<Map> query(StorageEnum type, Query query, String collectionId) {
+    public List<Map> query(Query query) {
         try {
-            collectionId = getCollectionId(type, collectionId);
-            return select(collectionId, query);
+            String collection = getCollection(query.getType(), query.getCollection());
+            query.setCollection(collection);
+            return select(query);
         } catch (IOException e) {
-            logger.error("query collectionId:{}, query:{}, failed:{}", collectionId, query, e.getMessage());
+            logger.error("query collectionId:{}, params:{}, failed:{}", query.getCollection(), query.getParams(), e.getMessage());
             throw new StorageException(e);
         }
     }
@@ -83,7 +80,8 @@ public abstract class AbstractStorageService implements StorageService, Applicat
         Assert.notNull(params, "Params can not be null.");
         logger.debug("collectionId:{}, params:{}", collectionId, params);
         try {
-            insert(getCollectionId(type, collectionId), params);
+            String collection = getCollection(type, collectionId);
+            insert(type, collection, params);
         } catch (IOException e) {
             logger.error("add collectionId:{}, params:{}, failed:{}", collectionId, params, e.getMessage());
             throw new StorageException(e);
@@ -100,7 +98,8 @@ public abstract class AbstractStorageService implements StorageService, Applicat
         Assert.notNull(params, "Params can not be null.");
         logger.debug("collectionId:{}, params:{}", collectionId, params);
         try {
-            update(getCollectionId(type, collectionId), params);
+            String collection = getCollection(type, collectionId);
+            update(type, collection, params);
         } catch (IOException e) {
             logger.error("edit collectionId:{}, params:{}, failed:{}", collectionId, params, e.getMessage());
             throw new StorageException(e);
@@ -117,7 +116,8 @@ public abstract class AbstractStorageService implements StorageService, Applicat
         Assert.hasText(id, "ID can not be null.");
         logger.debug("collectionId:{}, id:{}", collectionId, id);
         try {
-            delete(getCollectionId(type, collectionId), id);
+            String collection = getCollection(type, collectionId);
+            delete(type, collection, id);
         } catch (IOException e) {
             logger.error("remove collectionId:{}, id:{}, failed:{}", collectionId, id, e.getMessage());
             throw new StorageException(e);
@@ -127,7 +127,8 @@ public abstract class AbstractStorageService implements StorageService, Applicat
     @Override
     public void addLog(StorageEnum type, Map<String, Object> params) {
         try {
-            insertLog(getCollectionId(type, null), params);
+            String collection = getCollection(type, null);
+            insertLog(type, collection, params);
         } catch (IOException e) {
             logger.error(e.getMessage());
             throw new StorageException(e);
@@ -137,7 +138,8 @@ public abstract class AbstractStorageService implements StorageService, Applicat
     @Override
     public void addData(StorageEnum type, String collectionId, List<Map> list) {
         try {
-            insertData(getCollectionId(type, collectionId), list);
+            String collection = getCollection(type, collectionId);
+            insertData(type, collection, list);
         } catch (IOException e) {
             logger.error(e.getMessage());
             throw new StorageException(e);
@@ -147,18 +149,26 @@ public abstract class AbstractStorageService implements StorageService, Applicat
     @Override
     public void clear(StorageEnum type, String collectionId) {
         try {
-            deleteAll(getCollectionId(type, collectionId));
+            String collection = getCollection(type, collectionId);
+            deleteAll(type, collection);
         } catch (IOException e) {
             logger.error("clear collectionId:{}, failed:{}", collectionId, e.getMessage());
             throw new StorageException(e);
         }
     }
 
-    private String getCollectionId(StorageEnum type, String collectionId) {
-        Assert.notNull(type, "StorageEnum can not be null.");
-        Strategy strategy = map.get(type.getType().concat("Strategy"));
+    protected String getSeparator(){
+        return File.separator;
+    }
+
+    private String getCollection(StorageEnum type, String collection) {
+        Assert.notNull(type, "StorageEnum type can not be null.");
+        return getCollection(type.getType(), collection);
+    }
+    private String getCollection(String type, String collection) {
+        Strategy strategy = map.get(type.concat("Strategy"));
         Assert.notNull(strategy, "Strategy does not exist.");
-        return strategy.createCollectionId(collectionId);
+        return strategy.createCollectionId(getSeparator(), collection);
     }
 
 }

+ 1 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageService.java

@@ -13,9 +13,7 @@ import java.util.Map;
  */
 public interface StorageService {
 
-    List<Map> query(StorageEnum type, Query query);
-
-    List<Map> query(StorageEnum type, Query query, String collectionId);
+    List<Map> query(Query query);
 
     void add(StorageEnum type, Map params);
 

+ 25 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.storage.query;
 
+import org.dbsyncer.storage.enums.StorageEnum;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -10,6 +12,13 @@ import java.util.List;
  */
 public class Query {
 
+    /**
+     * {@link StorageEnum}
+     */
+    private String type;
+
+    private String collection;
+
     private List<Param> params;
 
     private int pageNum = 1;
@@ -34,6 +43,22 @@ public class Query {
         params.add(new Param(key, value, highlighter));
     }
 
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getCollection() {
+        return collection;
+    }
+
+    public void setCollection(String collection) {
+        this.collection = collection;
+    }
+
     public List<Param> getParams() {
         return params;
     }

+ 2 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/Strategy.java

@@ -21,9 +21,10 @@ public interface Strategy {
     /**
      * 创建集合ID
      *
+     * @param separator
      * @param id
      * @return
      */
-    String createCollectionId(String id);
+    String createCollectionId(String separator, String id);
 
 }

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/ConfigStrategy.java

@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
 public class ConfigStrategy implements Strategy {
 
     @Override
-    public String createCollectionId(String id) {
+    public String createCollectionId(String separator, String id) {
         return StorageEnum.CONFIG.getType();
     }
 }

+ 2 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/DataStrategy.java

@@ -5,8 +5,6 @@ import org.dbsyncer.storage.strategy.Strategy;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.io.File;
-
 /**
  * 数据:全量或增量数据
  *
@@ -17,12 +15,10 @@ import java.io.File;
 @Component
 public class DataStrategy implements Strategy {
 
-    private static final String COLLECTION_ID = StorageEnum.DATA.getType() + File.separator;
-
     @Override
-    public String createCollectionId(String id) {
+    public String createCollectionId(String separator, String id) {
         Assert.hasText(id, "Id can not be empty.");
         // 同步数据较多,根据不同的驱动生成集合ID: data/123
-        return COLLECTION_ID + id;
+        return new StringBuilder(StorageEnum.DATA.getType()).append(separator).append(id).toString();
     }
 }

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/LogStrategy.java

@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
 public class LogStrategy implements Strategy {
 
     @Override
-    public String createCollectionId(String id) {
+    public String createCollectionId(String separator, String id) {
         return StorageEnum.LOG.getType();
     }
 }

+ 22 - 22
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -59,12 +59,12 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     }
 
     @Override
-    public List<Map> select(String collectionId, Query query) throws IOException {
-        Shard shard = map.get(collectionId);
+    public List<Map> select(Query query) throws IOException {
+        Shard shard = map.get(query.getCollection());
 
         // 检查是否存在历史
         if (null == shard) {
-            shard = cacheShardIfExist(collectionId);
+            shard = cacheShardIfExist(query.getCollection());
         }
 
         if (null != shard) {
@@ -79,7 +79,7 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
                 BooleanQuery.Builder builder = new BooleanQuery.Builder();
                 params.forEach(p -> builder.add(new TermQuery(new Term(p.getKey(), p.getValue())), BooleanClause.Occur.MUST));
                 BooleanQuery q = builder.build();
-                return shard.query(new Option(q, query.getParams()), pageNum, pageSize, sort);
+                return shard.query(new Option(q, params), pageNum, pageSize, sort);
             }
 
             return shard.query(new Option(new MatchAllDocsQuery()), pageNum, pageSize, sort);
@@ -88,49 +88,49 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     }
 
     @Override
-    public void insert(String collectionId, Map params) throws IOException {
-        createShardIfNotExist(collectionId);
+    public void insert(StorageEnum type, String collection, Map params) throws IOException {
+        createShardIfNotExist(collection);
         Document doc = ParamsUtil.convertConfig2Doc(params);
-        map.get(collectionId).insert(doc);
+        map.get(collection).insert(doc);
     }
 
     @Override
-    public void update(String collectionId, Map params) throws IOException {
-        createShardIfNotExist(collectionId);
+    public void update(StorageEnum type, String collection, Map params) throws IOException {
+        createShardIfNotExist(collection);
         Document doc = ParamsUtil.convertConfig2Doc(params);
         IndexableField field = doc.getField(ConfigConstant.CONFIG_MODEL_ID);
-        map.get(collectionId).update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), doc);
+        map.get(collection).update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), doc);
     }
 
     @Override
-    public void delete(String collectionId, String id) throws IOException {
-        createShardIfNotExist(collectionId);
-        map.get(collectionId).delete(new Term(ConfigConstant.CONFIG_MODEL_ID, id));
+    public void delete(StorageEnum type, String collection, String id) throws IOException {
+        createShardIfNotExist(collection);
+        map.get(collection).delete(new Term(ConfigConstant.CONFIG_MODEL_ID, id));
     }
 
     @Override
-    public void deleteAll(String collectionId) throws IOException {
+    public void deleteAll(StorageEnum type, String collection) throws IOException {
         synchronized (this){
-            Shard shard = map.get(collectionId);
+            Shard shard = map.get(collection);
             if (null != shard) {
                 shard.deleteAll();
-                map.remove(collectionId);
+                map.remove(collection);
             }
         }
     }
 
     @Override
-    public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
-        createShardIfNotExist(collectionId);
+    public void insertLog(StorageEnum type, String collection, Map<String, Object> params) throws IOException {
+        createShardIfNotExist(collection);
         Document doc = ParamsUtil.convertLog2Doc(params);
-        map.get(collectionId).insert(doc);
+        map.get(collection).insert(doc);
     }
 
     @Override
-    public void insertData(String collectionId, List<Map> list) throws IOException {
-        createShardIfNotExist(collectionId);
+    public void insertData(StorageEnum type, String collection, List<Map> list) throws IOException {
+        createShardIfNotExist(collection);
         List<Document> docs = list.parallelStream().map(r -> ParamsUtil.convertData2Doc(r)).collect(Collectors.toList());
-        map.get(collectionId).insertBatch(docs);
+        map.get(collection).insertBatch(docs);
     }
 
     /**

+ 22 - 16
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -9,6 +9,7 @@ import org.dbsyncer.connector.mysql.MysqlConnector;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.JDBCUtil;
 import org.dbsyncer.storage.AbstractStorageService;
+import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
@@ -84,7 +85,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             field.setAccessible(true);
             database = (String) field.get(delegate);
         } catch (Exception e) {
-            logger.error(e.getMessage());
+            throw new StorageException(e.getMessage());
         } finally {
             JDBCUtil.close(conn);
         }
@@ -94,45 +95,45 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     @Override
-    public List<Map> select(String table, Query query) {
-        getExecutor(table, StorageEnum.CONFIG.getType());
+    public List<Map> select(Query query) {
+        getExecutor(query.getType(), query.getCollection());
 
         return null;
     }
 
     @Override
-    public void insert(String table, Map params) {
-        getExecutor(table, StorageEnum.CONFIG.getType());
+    public void insert(StorageEnum type, String table, Map params) {
+        getExecutor(type.getType(), table);
 
     }
 
     @Override
-    public void update(String table, Map params) {
-        getExecutor(table, StorageEnum.CONFIG.getType());
+    public void update(StorageEnum type, String table, Map params) {
+        getExecutor(type.getType(), table);
 
     }
 
     @Override
-    public void delete(String table, String id) {
-        getExecutor(table, StorageEnum.CONFIG.getType());
+    public void delete(StorageEnum type, String table, String id) {
+        getExecutor(type.getType(), table);
 
     }
 
     @Override
-    public void deleteAll(String table) {
-        getExecutor(table, StorageEnum.CONFIG.getType());
+    public void deleteAll(StorageEnum type, String table) {
+        getExecutor(type.getType(), table);
 
     }
 
     @Override
-    public void insertLog(String table, Map<String, Object> params) {
-        getExecutor(table, StorageEnum.LOG.getType());
+    public void insertLog(StorageEnum type, String table, Map<String, Object> params) {
+        getExecutor(type.getType(), table);
 
     }
 
     @Override
-    public void insertData(String table, List<Map> list) {
-        getExecutor(table, StorageEnum.DATA.getType());
+    public void insertData(StorageEnum type, String table, List<Map> list) {
+        getExecutor(type.getType(), table);
 
     }
 
@@ -141,7 +142,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         DatabaseUtil.close(jdbcTemplate);
     }
 
-    private Executor getExecutor(String table, String group) {
+    @Override
+    protected String getSeparator() {
+        return "_";
+    }
+
+    private Executor getExecutor(String group, String table) {
         // 获取模板
         Executor executor = tables.get(group);
         Assert.notNull(executor, "未知的存储类型");