AE86 2 lat temu
rodzic
commit
5799a202b2

+ 3 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/PersistenceCommand.java

@@ -2,6 +2,7 @@ package org.dbsyncer.manager.command;
 
 import org.dbsyncer.manager.Command;
 import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.enums.StorageEnum;
 
 import java.util.Map;
 
@@ -17,12 +18,12 @@ public class PersistenceCommand implements Command {
     }
 
     public boolean addConfig() {
-        storageService.addConfig(params);
+        storageService.add(StorageEnum.CONFIG, params);
         return true;
     }
 
     public boolean editConfig() {
-        storageService.editConfig(params);
+        storageService.edit(StorageEnum.CONFIG, params);
         return true;
     }
 

+ 2 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/OperationTemplate.java

@@ -13,6 +13,7 @@ import org.dbsyncer.manager.model.QueryConfig;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.util.ConfigModelUtil;
 import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
@@ -130,7 +131,7 @@ public final class OperationTemplate {
             }
         }
         cacheService.remove(id);
-        storageService.removeConfig(id);
+        storageService.remove(StorageEnum.CONFIG, id);
     }
 
     public String getGroupId(ConfigModel model, GroupStrategyEnum strategy) {

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -11,6 +11,7 @@ import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -54,7 +55,7 @@ public class FlushServiceImpl implements FlushService {
         params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
         params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
         params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
-        storageService.addLog(params);
+        storageService.add(StorageEnum.LOG, params);
     }
 
     @Override

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -4,6 +4,7 @@ import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
 import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -31,6 +32,6 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
 
     @Override
     protected void pull(StorageResponse response) {
-        storageService.addData(response.getMetaId(), response.getDataList());
+        storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
 }

+ 53 - 16
dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.storage;
 
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.strategy.Strategy;
@@ -37,7 +38,7 @@ public abstract class AbstractStorageService implements StorageService, Disposab
 
     protected abstract void batchUpdate(StorageEnum type, String sharding, List<Map> list);
 
-    protected abstract void batchDelete(StorageEnum type, String sharding, List<String> list);
+    protected abstract void batchDelete(StorageEnum type, String sharding, List<String> ids);
 
     protected String getSharding(StorageEnum type, String collectionId) {
         Assert.notNull(type, "StorageEnum type can not be null.");
@@ -90,33 +91,69 @@ public abstract class AbstractStorageService implements StorageService, Disposab
     }
 
     @Override
-    public void addConfig(Map params) {
-        StorageEnum type = StorageEnum.CONFIG;
-        batchInsert(type, getSharding(type, null), newArrayList(params));
+    public void add(StorageEnum type, Map params) {
+        add(type, null, params);
     }
 
     @Override
-    public void editConfig(Map params) {
-        StorageEnum type = StorageEnum.CONFIG;
-        batchUpdate(type, getSharding(type, null), newArrayList(params));
+    public void add(StorageEnum type, String metaId, Map params) {
+        addBatch(type, metaId, newArrayList(params));
     }
 
     @Override
-    public void removeConfig(String id) {
-        StorageEnum type = StorageEnum.CONFIG;
-        batchDelete(type, getSharding(type, null), newArrayList(id));
+    public void addBatch(StorageEnum type, List<Map> list) {
+        addBatch(type, null, list);
     }
 
     @Override
-    public void addLog(Map<String, Object> params) {
-        StorageEnum type = StorageEnum.LOG;
-        batchInsert(type, getSharding(type, null), newArrayList(params));
+    public void addBatch(StorageEnum type, String metaId, List<Map> list) {
+        if (!CollectionUtils.isEmpty(list)) {
+            batchInsert(type, getSharding(type, metaId), list);
+        }
+    }
+
+    @Override
+    public void edit(StorageEnum type, Map params) {
+        edit(type, null, params);
+    }
+
+    @Override
+    public void edit(StorageEnum type, String metaId, Map params) {
+        editBatch(type, metaId, newArrayList(params));
+    }
+
+    @Override
+    public void editBatch(StorageEnum type, List<Map> list) {
+        editBatch(type, null, list);
+    }
+
+    @Override
+    public void editBatch(StorageEnum type, String metaId, List<Map> list) {
+        if (!CollectionUtils.isEmpty(list)) {
+            batchUpdate(type, getSharding(type, metaId), list);
+        }
+    }
+
+    @Override
+    public void remove(StorageEnum type, String id) {
+        remove(type, null, id);
+    }
+
+    @Override
+    public void remove(StorageEnum type, String metaId, String id) {
+        removeBatch(type, metaId, newArrayList(id));
+    }
+
+    @Override
+    public void removeBatch(StorageEnum type, List<String> ids) {
+        removeBatch(type, null, ids);
     }
 
     @Override
-    public void addData(String collectionId, List<Map> list) {
-        StorageEnum type = StorageEnum.DATA;
-        batchInsert(type, getSharding(type, collectionId), list);
+    public void removeBatch(StorageEnum type, String metaId, List<String> ids) {
+        if (!CollectionUtils.isEmpty(ids)) {
+            batchDelete(type, getSharding(type, metaId), ids);
+        }
     }
 
     private List<Map> newArrayList(Map params) {

+ 80 - 14
dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageService.java

@@ -26,43 +26,109 @@ public interface StorageService {
      * 清空数据/日志
      *
      * @param type
-     * @param collectionId
+     * @param metaId
      */
-    void clear(StorageEnum type, String collectionId);
+    void clear(StorageEnum type, String metaId);
 
     /**
-     * 添加配置
+     * 添加
      *
+     * @param type
      * @param params
      */
-    void addConfig(Map params);
+    void add(StorageEnum type, Map params);
 
     /**
-     * 修改配置
+     * 添加
      *
+     * @param type
+     * @param metaId
      * @param params
      */
-    void editConfig(Map params);
+    void add(StorageEnum type, String metaId, Map params);
 
     /**
-     * 删除配置
+     * 批量添加
      *
-     * @param id
+     * @param type
+     * @param list
+     */
+    void addBatch(StorageEnum type, List<Map> list);
+
+    /**
+     * 批量添加
+     *
+     * @param type
+     * @param metaId
+     * @param list
+     */
+    void addBatch(StorageEnum type, String metaId, List<Map> list);
+
+    /**
+     * 修改
+     *
+     * @param type
+     * @param params
      */
-    void removeConfig(String id);
+    void edit(StorageEnum type, Map params);
 
     /**
-     * 记录日志
+     * 修改
      *
+     * @param type
+     * @param metaId
      * @param params
      */
-    void addLog(Map<String,Object> params);
+    void edit(StorageEnum type, String metaId, Map params);
+
+    /**
+     * 批量修改
+     *
+     * @param type
+     * @param list
+     */
+    void editBatch(StorageEnum type, List<Map> list);
 
     /**
-     * 记录数据
+     * 批量修改
      *
-     * @param collectionId
+     * @param type
+     * @param metaId
+     * @param list
+     */
+    void editBatch(StorageEnum type, String metaId, List<Map> list);
+
+    /**
+     * 删除
+     *
+     * @param type
+     * @param id
+     */
+    void remove(StorageEnum type, String id);
+
+    /**
+     * 删除
+     *
+     * @param type
+     * @param metaId
+     * @param id
+     */
+    void remove(StorageEnum type, String metaId, String id);
+
+    /**
+     * 批量删除
+     *
+     * @param type
+     * @param ids
+     */
+    void removeBatch(StorageEnum type, List<String> ids);
+
+    /**
+     * 批量删除
+     *
+     * @param type
+     * @param metaId
      * @param list
      */
-    void addData(String collectionId, List<Map> list);
+    void removeBatch(StorageEnum type, String metaId, List<String> ids);
 }

+ 5 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/StorageEnum.java

@@ -18,7 +18,11 @@ public enum StorageEnum {
     /**
      * 数据:全量或增量数据
      */
-    DATA("data");
+    DATA("data"),
+    /**
+     * Binlog:缓存队列数据
+     */
+    BINLOG("binlog");
 
     private String type;
 

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -104,12 +104,12 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     }
 
     @Override
-    protected void batchDelete(StorageEnum type, String sharding, List<String> list) {
+    protected void batchDelete(StorageEnum type, String sharding, List<String> ids) {
         Shard shard = getShard(sharding);
-        int size = list.size();
+        int size = ids.size();
         Term[] terms = new Term[size];
         for (int i = 0; i < size; i++) {
-            terms[i] = getPrimaryKeyTerm(list.get(i));
+            terms[i] = getPrimaryKeyTerm(ids.get(i));
         }
         try {
             shard.deleteBatch(terms);

+ 2 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -160,14 +160,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     @Override
-    protected void batchDelete(StorageEnum type, String sharding, List<String> list) {
-        if (CollectionUtils.isEmpty(list)) {
-            return;
-        }
-
+    protected void batchDelete(StorageEnum type, String sharding, List<String> ids) {
         final Executor executor = getExecutor(type, sharding);
         final String sql = executor.getDelete();
-        final List<Object[]> args = list.stream().map(id -> new Object[]{id}).collect(Collectors.toList());
+        final List<Object[]> args = ids.stream().map(id -> new Object[]{id}).collect(Collectors.toList());
         connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, args));
     }