Преглед на файлове

!106 fixbug
Merge pull request !106 from AE86/V_1.0.0_RC

AE86 преди 2 години
родител
ревизия
9f8029cc86
променени са 26 файла, в които са добавени 715 реда и са изтрити 415 реда
  1. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  2. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java
  3. 4 5
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  4. 5 8
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/PersistenceCommand.java
  5. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/CommandEnum.java
  6. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/OperationTemplate.java
  7. 2 2
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java
  8. 11 7
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  9. 32 28
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  10. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  11. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  12. 81 104
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java
  13. 95 16
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageService.java
  14. 196 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java
  15. 5 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java
  16. 5 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/StorageEnum.java
  17. 28 23
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  18. 5 5
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java
  19. 3 3
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/Strategy.java
  20. 21 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/BinlogStrategy.java
  21. 1 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/ConfigStrategy.java
  22. 3 3
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/DataStrategy.java
  23. 1 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/LogStrategy.java
  24. 84 80
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java
  25. 100 120
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  26. 25 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -87,7 +87,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
     @Override
     public String getConnectorMapperCacheKey(ESConfig config) {
-        return String.format("%s-%s-%s-%", config.getConnectorType(), config.getUrl(), config.getIndex(), config.getUsername());
+        return String.format("%s-%s-%s-%s", config.getConnectorType(), config.getUrl(), config.getIndex(), config.getUsername());
     }
 
     @Override

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -107,9 +107,9 @@ public interface Manager extends Executor {
     List<Meta> getMetaAll();
 
     // Data
-    Paging queryData(Query query, String collectionId);
+    Paging queryData(Query query);
 
-    void clearData(String collectionId);
+    void clearData(String metaId);
 
     // Log
     Paging queryLog(Query query);

+ 4 - 5
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -220,20 +220,19 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
     }
 
     @Override
-    public Paging queryData(Query query, String collectionId) {
+    public Paging queryData(Query query) {
         query.setType(StorageEnum.DATA);
-        query.setCollection(collectionId);
         return storageService.query(query);
     }
 
     @Override
-    public void clearData(String collectionId) {
-        Meta meta = getMeta(collectionId);
+    public void clearData(String metaId) {
+        Meta meta = getMeta(metaId);
         Mapping mapping = getMapping(meta.getMappingId());
         String model = ModelEnum.getModelEnum(mapping.getModel()).getName();
         LogType.MappingLog log = LogType.MappingLog.CLEAR_DATA;
         logService.log(log, "%s:%s(%s)", log.getMessage(), mapping.getName(), model);
-        storageService.clear(StorageEnum.DATA, collectionId);
+        storageService.clear(StorageEnum.DATA, metaId);
     }
 
     @Override

+ 5 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/PersistenceCommand.java

@@ -10,23 +10,20 @@ public class PersistenceCommand implements Command {
 
     private StorageService storageService;
 
-    private StorageEnum type;
-
     private Map params;
 
-    public PersistenceCommand(StorageService storageService, StorageEnum type, Map params) {
+    public PersistenceCommand(StorageService storageService, Map params) {
         this.storageService = storageService;
-        this.type = type;
         this.params = params;
     }
 
-    public boolean add() {
-        storageService.add(type, params);
+    public boolean addConfig() {
+        storageService.add(StorageEnum.CONFIG, params);
         return true;
     }
 
-    public boolean edit() {
-        storageService.edit(type, params);
+    public boolean editConfig() {
+        storageService.edit(StorageEnum.CONFIG, params);
         return true;
     }
 

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/CommandEnum.java

@@ -15,12 +15,12 @@ public enum CommandEnum {
     /**
      * 添加
      */
-    OPR_ADD("add", (cmd) -> ((PersistenceCommand) cmd).add()),
+    OPR_ADD("add", (cmd) -> ((PersistenceCommand) cmd).addConfig()),
 
     /**
      * 修改
      */
-    OPR_EDIT("edit", (cmd) -> ((PersistenceCommand) cmd).edit()),
+    OPR_EDIT("edit", (cmd) -> ((PersistenceCommand) cmd).editConfig()),
 
     /**
      * 预加载SystemConfig

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/OperationTemplate.java

@@ -96,7 +96,7 @@ public final class OperationTemplate {
         logger.debug("params:{}", params);
         CommandEnum cmd = config.getCommandEnum();
         Assert.notNull(cmd, "CommandEnum can not be null.");
-        cmd.getCommandExecutor().execute(new PersistenceCommand(storageService, StorageEnum.CONFIG, params));
+        cmd.getCommandExecutor().execute(new PersistenceCommand(storageService, params));
 
         // 3、缓存
         cache(model, config.getGroupStrategyEnum());

+ 2 - 2
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java

@@ -21,9 +21,9 @@ public interface Monitor {
 
     List<Meta> getMetaAll();
 
-    Paging queryData(String id, int pageNum, int pageSize, String error, String success);
+    Paging queryData(String metaId, int pageNum, int pageSize, String error, String success);
 
-    void clearData(String collectionId);
+    void clearData(String metaId);
 
     Paging queryLog(int pageNum, int pageSize, String json);
 

+ 11 - 7
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -20,6 +20,7 @@ import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,9 +86,9 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
     }
 
     @Override
-    public Paging queryData(String id, int pageNum, int pageSize, String error, String success) {
+    public Paging queryData(String metaId, int pageNum, int pageSize, String error, String success) {
         // 没有驱动
-        if (StringUtil.isBlank(id)) {
+        if (StringUtil.isBlank(metaId)) {
             return new Paging(pageNum, pageSize);
         }
 
@@ -98,12 +99,13 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         }
         // 查询是否成功, 默认查询失败
         query.addFilter(ConfigConstant.DATA_SUCCESS, StringUtil.isNotBlank(success) ? success : StorageDataStatusEnum.FAIL.getCode(), false, true);
-        return manager.queryData(query, id);
+        query.setMetaId(metaId);
+        return manager.queryData(query);
     }
 
     @Override
-    public void clearData(String collectionId) {
-        manager.clearData(collectionId);
+    public void clearData(String metaId) {
+        manager.clearData(metaId);
     }
 
     @Override
@@ -249,10 +251,12 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         AtomicLong total = new AtomicLong(0);
         if (!CollectionUtils.isEmpty(metaAll)) {
             Query query = new Query(1, 1);
+            query.setQueryTotal(true);
+            query.setType(StorageEnum.DATA);
             operation.apply(query);
             metaAll.forEach(meta -> {
-                query.setQueryTotal(true);
-                Paging paging = manager.queryData(query, meta.getId());
+                query.setMetaId(meta.getId());
+                Paging paging = manager.queryData(query);
                 total.getAndAdd(paging.getTotal());
             });
         }

+ 32 - 28
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -130,37 +130,41 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     }
 
     private void flush(Queue<Request> queue) throws IllegalAccessException, InstantiationException {
-        if (!queue.isEmpty()) {
-            AtomicLong batchCounter = new AtomicLong();
-            final Map<String, Response> map = new LinkedHashMap<>();
-            while (!queue.isEmpty() && batchCounter.get() < bufferActuatorConfig.getBatchCount()) {
-                Request poll = queue.poll();
-                String key = getPartitionKey(poll);
-                if (!map.containsKey(key)) {
-                    map.putIfAbsent(key, responseClazz.newInstance());
-                }
-                Response response = map.get(key);
-                partition(poll, response);
-                batchCounter.incrementAndGet();
-
-                Request next = queue.peek();
-                if (null != next && skipPartition(next, response)) {
-                    break;
-                }
+        if (queue.isEmpty()) {
+            return;
+        }
+
+        AtomicLong batchCounter = new AtomicLong();
+        Map<String, Response> map = new LinkedHashMap<>();
+        while (!queue.isEmpty() && batchCounter.get() < bufferActuatorConfig.getBatchCount()) {
+            Request poll = queue.poll();
+            String key = getPartitionKey(poll);
+            if (!map.containsKey(key)) {
+                map.putIfAbsent(key, responseClazz.newInstance());
             }
+            Response response = map.get(key);
+            partition(poll, response);
+            batchCounter.incrementAndGet();
 
-            map.forEach((key, flushTask) -> {
-                long now = Instant.now().toEpochMilli();
-                try {
-                    pull(flushTask);
-                } catch (Exception e) {
-                    logger.error("[{}]异常{}", key);
-                }
-                final BufferResponse task = (BufferResponse) flushTask;
-                logger.info("[{}{}]{}条,耗时{}毫秒", key, task.getSuffixName(), task.getTaskSize(), (Instant.now().toEpochMilli() - now));
-            });
-            map.clear();
+            Request next = queue.peek();
+            if (null != next && skipPartition(next, response)) {
+                break;
+            }
         }
+
+        map.forEach((key, flushTask) -> {
+            long now = Instant.now().toEpochMilli();
+            try {
+                pull(flushTask);
+            } catch (Exception e) {
+                logger.error("[{}]异常{}", key);
+            }
+            final BufferResponse task = (BufferResponse) flushTask;
+            logger.info("[{}{}]{}, {}ms", key, task.getSuffixName(), task.getTaskSize(), (Instant.now().toEpochMilli() - now));
+        });
+        map.clear();
+        map = null;
+        batchCounter = null;
     }
 
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -55,7 +55,7 @@ public class FlushServiceImpl implements FlushService {
         params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
         params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
         params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
-        storageService.addLog(StorageEnum.LOG, params);
+        storageService.add(StorageEnum.LOG, params);
     }
 
     @Override

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -32,6 +32,6 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
 
     @Override
     protected void pull(StorageResponse response) {
-        storageService.addData(StorageEnum.DATA, response.getMetaId(), response.getDataList());
+        storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
 }

+ 81 - 104
dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java

@@ -1,17 +1,16 @@
 package org.dbsyncer.storage;
 
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.strategy.Strategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 
 import java.io.File;
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
@@ -24,44 +23,37 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public abstract class AbstractStorageService implements StorageService, DisposableBean {
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
+    @Autowired
+    private Map<String, Strategy> map;
 
     private final Lock lock = new ReentrantLock(true);
 
     private volatile boolean tryDeleteAll;
 
-    @Autowired
-    private Map<String, Strategy> map;
-
-    public abstract Paging select(Query query) throws IOException;
+    protected abstract Paging select(String sharding, Query query);
 
-    public abstract void insert(StorageEnum type, String collection, Map params) throws IOException;
+    protected abstract void deleteAll(String sharding);
 
-    public abstract void update(StorageEnum type, String collection, Map params) throws IOException;
+    protected abstract void batchInsert(StorageEnum type, String sharding, List<Map> list);
 
-    public abstract void delete(StorageEnum type, String collection, String id) throws IOException;
+    protected abstract void batchUpdate(StorageEnum type, String sharding, List<Map> list);
 
-    public abstract void deleteAll(StorageEnum type, String collection) throws IOException;
+    protected abstract void batchDelete(StorageEnum type, String sharding, List<String> ids);
 
-    /**
-     * 记录日志
-     *
-     * @param collection
-     * @param params
-     */
-    public abstract void insertLog(StorageEnum type, String collection, Map<String, Object> params) throws IOException;
+    protected String getSharding(StorageEnum type, String collectionId) {
+        Assert.notNull(type, "StorageEnum type can not be null.");
+        Strategy strategy = map.get(type.getType().concat("Strategy"));
+        Assert.notNull(strategy, "Strategy does not exist.");
+        return strategy.createSharding(getSeparator(), collectionId);
+    }
 
-    /**
-     * 记录错误数据
-     *
-     * @param collection
-     * @param list
-     */
-    public abstract void insertData(StorageEnum type, String collection, List<Map> list) throws IOException;
+    protected String getSeparator() {
+        return File.separator;
+    }
 
     @Override
     public Paging query(Query query) {
-        if(tryDeleteAll){
+        if (tryDeleteAll) {
             return new Paging(query.getPageNum(), query.getPageSize());
         }
 
@@ -69,13 +61,9 @@ public abstract class AbstractStorageService implements StorageService, Disposab
         try {
             locked = lock.tryLock();
             if (locked) {
-                String collection = getCollection(query.getType(), query.getCollection());
-                query.setCollection(collection);
-                return select(query);
+                String sharding = getSharding(query.getType(), query.getMetaId());
+                return select(sharding, query);
             }
-        } catch (Exception e) {
-            logger.error("query collectionId:{}, params:{}, failed:{}", query.getCollection(), query.getParams(), e.getMessage());
-            throw new StorageException(e);
         } finally {
             if (locked) {
                 lock.unlock();
@@ -84,111 +72,100 @@ public abstract class AbstractStorageService implements StorageService, Disposab
         return new Paging(query.getPageNum(), query.getPageSize());
     }
 
+    @Override
+    public void clear(StorageEnum type, String metaId) {
+        boolean locked = false;
+        try {
+            locked = lock.tryLock();
+            if (locked) {
+                tryDeleteAll = true;
+                String sharding = getSharding(type, metaId);
+                deleteAll(sharding);
+            }
+        } finally {
+            if (locked) {
+                tryDeleteAll = false;
+                lock.unlock();
+            }
+        }
+    }
+
     @Override
     public void add(StorageEnum type, Map params) {
-        add(type, params, null);
+        add(type, null, params);
     }
 
     @Override
-    public void add(StorageEnum type, Map params, String collectionId) {
-        Assert.notNull(params, "Params can not be null.");
-        logger.debug("collectionId:{}, params:{}", collectionId, params);
-        try {
-            String collection = getCollection(type, collectionId);
-            insert(type, collection, params);
-        } catch (IOException e) {
-            logger.error("add collectionId:{}, params:{}, failed:{}", collectionId, params, e.getMessage());
-            throw new StorageException(e);
+    public void add(StorageEnum type, String metaId, Map params) {
+        addBatch(type, metaId, newArrayList(params));
+    }
+
+    @Override
+    public void addBatch(StorageEnum type, List<Map> list) {
+        addBatch(type, null, list);
+    }
+
+    @Override
+    public void addBatch(StorageEnum type, String metaId, List<Map> list) {
+        if (!CollectionUtils.isEmpty(list)) {
+            batchInsert(type, getSharding(type, metaId), list);
         }
     }
 
     @Override
     public void edit(StorageEnum type, Map params) {
-        edit(type, params, null);
+        edit(type, null, params);
     }
 
     @Override
-    public void edit(StorageEnum type, Map params, String collectionId) {
-        Assert.notNull(params, "Params can not be null.");
-        logger.debug("collectionId:{}, params:{}", collectionId, params);
-        try {
-            String collection = getCollection(type, collectionId);
-            update(type, collection, params);
-        } catch (IOException e) {
-            logger.error("edit collectionId:{}, params:{}, failed:{}", collectionId, params, e.getMessage());
-            throw new StorageException(e);
-        }
+    public void edit(StorageEnum type, String metaId, Map params) {
+        editBatch(type, metaId, newArrayList(params));
     }
 
     @Override
-    public void remove(StorageEnum type, String id) {
-        remove(type, id, null);
+    public void editBatch(StorageEnum type, List<Map> list) {
+        editBatch(type, null, list);
     }
 
     @Override
-    public void remove(StorageEnum type, String id, String collectionId) {
-        Assert.hasText(id, "ID can not be null.");
-        logger.debug("collectionId:{}, id:{}", collectionId, id);
-        try {
-            String collection = getCollection(type, collectionId);
-            delete(type, collection, id);
-        } catch (IOException e) {
-            logger.error("remove collectionId:{}, id:{}, failed:{}", collectionId, id, e.getMessage());
-            throw new StorageException(e);
+    public void editBatch(StorageEnum type, String metaId, List<Map> list) {
+        if (!CollectionUtils.isEmpty(list)) {
+            batchUpdate(type, getSharding(type, metaId), list);
         }
     }
 
     @Override
-    public void addLog(StorageEnum type, Map<String, Object> params) {
-        try {
-            String collection = getCollection(type, null);
-            insertLog(type, collection, params);
-        } catch (IOException e) {
-            logger.error(e.getMessage());
-            throw new StorageException(e);
-        }
+    public void remove(StorageEnum type, String id) {
+        remove(type, null, id);
     }
 
     @Override
-    public void addData(StorageEnum type, String collectionId, List<Map> list) {
-        try {
-            String collection = getCollection(type, collectionId);
-            insertData(type, collection, list);
-        } catch (IOException e) {
-            logger.error(e.getMessage());
-            throw new StorageException(e);
-        }
+    public void remove(StorageEnum type, String metaId, String id) {
+        removeBatch(type, metaId, newArrayList(id));
     }
 
     @Override
-    public void clear(StorageEnum type, String collectionId) {
-        boolean locked = false;
-        try {
-            locked = lock.tryLock();
-            if (locked) {
-                tryDeleteAll = true;
-                deleteAll(type, getCollection(type, collectionId));
-            }
-        } catch (Exception e) {
-            logger.error("clear collectionId:{}, failed:{}", collectionId, e.getMessage());
-            throw new StorageException(e);
-        } finally {
-            if (locked) {
-                tryDeleteAll = false;
-                lock.unlock();
-            }
+    public void removeBatch(StorageEnum type, List<String> ids) {
+        removeBatch(type, null, ids);
+    }
+
+    @Override
+    public void removeBatch(StorageEnum type, String metaId, List<String> ids) {
+        if (!CollectionUtils.isEmpty(ids)) {
+            batchDelete(type, getSharding(type, metaId), ids);
         }
     }
 
-    protected String getSeparator() {
-        return File.separator;
+    private List<Map> newArrayList(Map params) {
+        List<Map> list = new ArrayList<>();
+        list.add(params);
+        return list;
     }
 
-    private String getCollection(StorageEnum type, String collection) {
-        Assert.notNull(type, "StorageEnum type can not be null.");
-        Strategy strategy = map.get(type.getType().concat("Strategy"));
-        Assert.notNull(strategy, "Strategy does not exist.");
-        return strategy.createCollectionId(getSeparator(), collection);
+    private List<String> newArrayList(String id) {
+        List<String> list = new ArrayList<>();
+        list.add(id);
+        return list;
     }
 
 }

+ 95 - 16
dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageService.java

@@ -14,42 +14,121 @@ import java.util.Map;
  */
 public interface StorageService {
 
+    /**
+     * 查询所有数据
+     *
+     * @param query
+     * @return
+     */
     Paging query(Query query);
 
-    void add(StorageEnum type, Map params);
+    /**
+     * 清空数据/日志
+     *
+     * @param type
+     * @param metaId
+     */
+    void clear(StorageEnum type, String metaId);
 
-    void add(StorageEnum type, Map params, String collectionId);
+    /**
+     * 添加
+     *
+     * @param type
+     * @param params
+     */
+    void add(StorageEnum type, Map params);
 
-    void edit(StorageEnum type, Map params);
+    /**
+     * 添加
+     *
+     * @param type
+     * @param metaId
+     * @param params
+     */
+    void add(StorageEnum type, String metaId, Map params);
 
-    void edit(StorageEnum type, Map params, String collectionId);
+    /**
+     * 批量添加
+     *
+     * @param type
+     * @param list
+     */
+    void addBatch(StorageEnum type, List<Map> list);
 
-    void remove(StorageEnum type, String id);
+    /**
+     * 批量添加
+     *
+     * @param type
+     * @param metaId
+     * @param list
+     */
+    void addBatch(StorageEnum type, String metaId, List<Map> list);
 
-    void remove(StorageEnum type, String id, String collectionId);
+    /**
+     * 修改
+     *
+     * @param type
+     * @param params
+     */
+    void edit(StorageEnum type, Map params);
 
     /**
-     * 记录日志
+     * 修改
      *
-     * @param log
+     * @param type
+     * @param metaId
      * @param params
      */
-    void addLog(StorageEnum log, Map<String,Object> params);
+    void edit(StorageEnum type, String metaId, Map params);
+
+    /**
+     * 批量修改
+     *
+     * @param type
+     * @param list
+     */
+    void editBatch(StorageEnum type, List<Map> list);
 
     /**
-     * 记录数据
+     * 批量修改
      *
-     * @param data
-     * @param collectionId
+     * @param type
+     * @param metaId
      * @param list
      */
-    void addData(StorageEnum data, String collectionId, List<Map> list);
+    void editBatch(StorageEnum type, String metaId, List<Map> list);
 
     /**
-     * 清空数据/日志
+     * 删除
+     *
+     * @param type
+     * @param id
+     */
+    void remove(StorageEnum type, String id);
+
+    /**
+     * 删除
+     *
+     * @param type
+     * @param metaId
+     * @param id
+     */
+    void remove(StorageEnum type, String metaId, String id);
+
+    /**
+     * 批量删除
+     *
+     * @param type
+     * @param ids
+     */
+    void removeBatch(StorageEnum type, List<String> ids);
+
+    /**
+     * 批量删除
      *
      * @param type
-     * @param collectionId
+     * @param metaId
+     * @param list
      */
-    void clear(StorageEnum type, String collectionId);
+    void removeBatch(StorageEnum type, String metaId, List<String> ids);
 }

+ 196 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java

@@ -0,0 +1,196 @@
+package org.dbsyncer.storage.binlog;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.dbsyncer.common.config.BinlogRecorderConfig;
+import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.constant.BinlogConstant;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/11/25 0:53
+ */
+public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private StorageService storageService;
+
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    @Autowired
+    private SnowflakeIdWorker snowflakeIdWorker;
+
+    @Autowired
+    private BinlogRecorderConfig binlogRecorderConfig;
+
+    private Queue<BinlogMessage> queue;
+
+    private WriterTask writerTask = new WriterTask();
+
+    private ReaderTask readerTask = new ReaderTask();
+
+    @PostConstruct
+    private void init() {
+        queue = new LinkedBlockingQueue(binlogRecorderConfig.getQueueCapacity());
+        scheduledTaskService.start(binlogRecorderConfig.getWriterPeriodMillisecond(), writerTask);
+        scheduledTaskService.start(binlogRecorderConfig.getReaderPeriodMillisecond(), readerTask);
+    }
+
+    /**
+     * 反序列化消息
+     *
+     * @param message
+     * @return
+     */
+    protected abstract Message deserialize(String messageId, BinlogMessage message);
+
+    @Override
+    public void flush(BinlogMessage message) {
+        queue.offer(message);
+    }
+
+    @Override
+    public void complete(List<String> messageIds) {
+        if (!CollectionUtils.isEmpty(messageIds)) {
+            storageService.removeBatch(StorageEnum.BINLOG, messageIds);
+        }
+    }
+
+    /**
+     * 合并缓存队列任务到磁盘
+     */
+    final class WriterTask implements ScheduledTaskJob {
+
+        @Override
+        public void run() {
+            if (queue.isEmpty()) {
+                return;
+            }
+
+            List<Map> tasks = new ArrayList<>();
+            int count = 0;
+            long now = Instant.now().toEpochMilli();
+            Map task = null;
+            while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
+                BinlogMessage message = queue.poll();
+                if (null != message) {
+                    task = new HashMap();
+                    task.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+                    task.put(ConfigConstant.BINLOG_STATUS, BinlogConstant.READY);
+                    task.put(ConfigConstant.CONFIG_MODEL_JSON, message.toByteArray());
+                    task.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+                    tasks.add(task);
+                }
+                count++;
+            }
+
+            if (!CollectionUtils.isEmpty(tasks)) {
+                storageService.addBatch(StorageEnum.BINLOG, tasks);
+            }
+            tasks = null;
+        }
+    }
+
+    /**
+     * 从磁盘读取日志到任务队列
+     */
+    final class ReaderTask implements ScheduledTaskJob {
+
+        private final Lock lock = new ReentrantLock(true);
+
+        private volatile boolean running;
+
+        @Override
+        public void run() {
+            // 读取任务数 >= 1/2缓存同步队列容量则继续等待
+            if (running || binlogRecorderConfig.getBatchCount() + getQueue().size() >= getQueueCapacity() / 2) {
+                return;
+            }
+
+            final Lock binlogLock = lock;
+            boolean locked = false;
+            try {
+                locked = binlogLock.tryLock();
+                if (locked) {
+                    running = true;
+                    doParse();
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            } finally {
+                if (locked) {
+                    running = false;
+                    binlogLock.unlock();
+                }
+            }
+        }
+
+        private void doParse() {
+            //  TODO 查询[待处理] 或 [处理中 & 处理超时]
+            Query query = new Query();
+            query.setType(StorageEnum.BINLOG);
+            query.addFilter(ConfigConstant.BINLOG_STATUS, String.valueOf(BinlogConstant.READY));
+            query.setPageNum(1);
+            query.setPageSize(binlogRecorderConfig.getBatchCount());
+            Paging paging = storageService.query(query);
+            if (CollectionUtils.isEmpty(paging.getData())) {
+                return;
+            }
+
+            List<Map> list = (List<Map>) paging.getData();
+            final int size = list.size();
+            final List<Message> messages = new ArrayList<>(size);
+            final List<Map> updateTasks = new ArrayList<>(size);
+            boolean existProcessing = false;
+            for (int i = 0; i < size; i++) {
+                Map row = list.get(i);
+                String id = (String) row.get(ConfigConstant.CONFIG_MODEL_ID);
+                Integer status = (Integer) row.get(ConfigConstant.BINLOG_STATUS);
+                byte[] bytes = (byte[]) row.get(ConfigConstant.CONFIG_MODEL_JSON);
+                if (BinlogConstant.PROCESSING == status) {
+                    existProcessing = true;
+                }
+                try {
+                    Message message = deserialize(id, BinlogMessage.parseFrom(bytes));
+                    if (null != message) {
+                        messages.add(message);
+                        row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
+                        updateTasks.add(row);
+                    }
+                } catch (InvalidProtocolBufferException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+            if (existProcessing) {
+                logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
+            }
+
+            // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
+            storageService.editBatch(StorageEnum.BINLOG, updateTasks);
+            getQueue().addAll(messages);
+        }
+    }
+
+}

+ 5 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -37,4 +37,9 @@ public class ConfigConstant {
     public static final String DATA_EVENT = "event";
     public static final String DATA_ERROR = "error";
 
+    /**
+     * Binlog
+     */
+    public static final String BINLOG_STATUS = "status";
+
 }

+ 5 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/StorageEnum.java

@@ -18,7 +18,11 @@ public enum StorageEnum {
     /**
      * 数据:全量或增量数据
      */
-    DATA("data");
+    DATA("data"),
+    /**
+     * Binlog:缓存队列数据
+     */
+    BINLOG("binlog");
 
     private String type;
 

+ 28 - 23
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -11,6 +11,7 @@ import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.query.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,21 +47,25 @@ public class Shard {
 
     private static final int MAX_SIZE = 10000;
 
-    public Shard(String path) throws IOException {
-        // 索引存放的位置,设置在当前目录中
-        Path dir = Paths.get(path);
-        indexPath = new File(dir.toUri());
-        directory = FSDirectory.open(dir);
-        // 分词器
-        analyzer = new SmartChineseAnalyzer();
-        // 创建索引写入配置
-        config = new IndexWriterConfig(analyzer);
-        // 默认32M, 减少合并次数
-        config.setRAMBufferSizeMB(32);
-        // 创建索引写入对象
-        indexWriter = new IndexWriter(directory, config);
-        // 创建索引的读取器
-        indexReader = DirectoryReader.open(indexWriter);
+    public Shard(String path) {
+        try {
+            // 索引存放的位置,设置在当前目录中
+            Path dir = Paths.get(path);
+            indexPath = new File(dir.toUri());
+            directory = FSDirectory.open(dir);
+            // 分词器
+            analyzer = new SmartChineseAnalyzer();
+            // 创建索引写入配置
+            config = new IndexWriterConfig(analyzer);
+            // 默认32M, 减少合并次数
+            config.setRAMBufferSizeMB(32);
+            // 创建索引写入对象
+            indexWriter = new IndexWriter(directory, config);
+            // 创建索引的读取器
+            indexReader = DirectoryReader.open(indexWriter);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
 
     public void insert(Document doc) throws IOException {
@@ -89,11 +94,15 @@ public class Shard {
         }
     }
 
-    public void deleteAll() throws IOException {
+    public void deleteAll() {
         // Fix Bug: this IndexReader is closed. 直接删除文件
-        close();
-        directory.close();
-        FileUtils.deleteDirectory(indexPath);
+        try {
+            close();
+            directory.close();
+            FileUtils.deleteDirectory(indexPath);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
 
     public void close() throws IOException {
@@ -117,10 +126,6 @@ public class Shard {
         return new IndexSearcher(indexReader);
     }
 
-    public Analyzer getAnalyzer() {
-        return analyzer;
-    }
-
     public Paging query(Option option, int pageNum, int pageSize, Sort sort) throws IOException {
         final IndexSearcher searcher = getSearcher();
         final TopDocs topDocs = getTopDocs(searcher, option.getQuery(), MAX_SIZE, sort);

+ 5 - 5
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -17,7 +17,7 @@ public class Query {
      */
     private StorageEnum type;
 
-    private String collection;
+    private String metaId;
 
     private List<Param> params;
 
@@ -65,12 +65,12 @@ public class Query {
         this.type = type;
     }
 
-    public String getCollection() {
-        return collection;
+    public String getMetaId() {
+        return metaId;
     }
 
-    public void setCollection(String collection) {
-        this.collection = collection;
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
     }
 
     public List<Param> getParams() {

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/Strategy.java

@@ -19,12 +19,12 @@ package org.dbsyncer.storage.strategy;
 public interface Strategy {
 
     /**
-     * 创建集合ID
+     * 创建分片
      *
      * @param separator
-     * @param id
+     * @param collectionId
      * @return
      */
-    String createCollectionId(String separator, String id);
+    String createSharding(String separator, String collectionId);
 
 }

+ 21 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/BinlogStrategy.java

@@ -0,0 +1,21 @@
+package org.dbsyncer.storage.strategy.impl;
+
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.strategy.Strategy;
+import org.springframework.stereotype.Component;
+
+/**
+ * 缓存队列数据
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/11/15 22:39
+ */
+@Component
+public class BinlogStrategy implements Strategy {
+
+    @Override
+    public String createSharding(String separator, String collectionId) {
+        return StorageEnum.BINLOG.getType();
+    }
+}

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/ConfigStrategy.java

@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
 public class ConfigStrategy implements Strategy {
 
     @Override
-    public String createCollectionId(String separator, String id) {
+    public String createSharding(String separator, String collectionId) {
         return StorageEnum.CONFIG.getType();
     }
 }

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/DataStrategy.java

@@ -16,9 +16,9 @@ import org.springframework.util.Assert;
 public class DataStrategy implements Strategy {
 
     @Override
-    public String createCollectionId(String separator, String id) {
-        Assert.hasText(id, "Id can not be empty.");
+    public String createSharding(String separator, String collectionId) {
+        Assert.hasText(collectionId, "The collectionId is empty.");
         // 同步数据较多,根据不同的驱动生成集合ID: data/123
-        return new StringBuilder(StorageEnum.DATA.getType()).append(separator).append(id).toString();
+        return new StringBuilder(StorageEnum.DATA.getType()).append(separator).append(collectionId).toString();
     }
 }

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/LogStrategy.java

@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
 public class LogStrategy implements Strategy {
 
     @Override
-    public String createCollectionId(String separator, String id) {
+    public String createSharding(String separator, String collectionId) {
         return StorageEnum.LOG.getType();
     }
 }

+ 84 - 80
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -2,7 +2,6 @@ package org.dbsyncer.storage.support;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
 import org.dbsyncer.common.model.Paging;
@@ -17,16 +16,14 @@ import org.dbsyncer.storage.query.Option;
 import org.dbsyncer.storage.query.Param;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.util.DocumentUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.PostConstruct;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 
 /**
  * 将数据存储在磁盘,基于lucene实现
@@ -37,9 +34,7 @@ import java.util.stream.Collectors;
  */
 public class DiskStorageServiceImpl extends AbstractStorageService {
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private Map<String, Shard> map = new ConcurrentHashMap();
+    private Map<String, Shard> shards = new ConcurrentHashMap();
 
     /**
      * 相对路径/data/
@@ -49,28 +44,16 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
 
     @PostConstruct
     private void init() {
-        try {
-            // 创建配置和日志索引shard
-            String config = StorageEnum.CONFIG.getType();
-            map.putIfAbsent(config, new Shard(PATH + config));
-
-            String log = StorageEnum.LOG.getType();
-            map.putIfAbsent(log, new Shard(PATH + log));
-        } catch (IOException e) {
-            throw new StorageException(e);
-        }
+        // 创建配置和日志索引shard
+        getShard(getSharding(StorageEnum.CONFIG, null));
+        getShard(getSharding(StorageEnum.LOG, null));
+        getShard(getSharding(StorageEnum.BINLOG, null));
     }
 
     @Override
-    public Paging select(Query query) throws IOException {
-        Shard shard = map.get(query.getCollection());
-
-        // 检查是否存在历史
-        if (null == shard) {
-            shard = cacheShardIfExist(query.getCollection());
-        }
-
-        if (null != shard) {
+    protected Paging select(String sharding, Query query) {
+        try {
+            Shard shard = getShard(sharding);
             int pageNum = query.getPageNum() <= 0 ? 1 : query.getPageNum();
             int pageSize = query.getPageSize() <= 0 ? 20 : query.getPageSize();
             boolean queryTotal = query.isQueryTotal();
@@ -93,54 +76,95 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
             }
 
             return shard.query(new Option(new MatchAllDocsQuery(), queryTotal, null), pageNum, pageSize, sort);
+        } catch (IOException e) {
+            throw new StorageException(e);
         }
-        return new Paging(query.getPageNum(), query.getPageSize());
     }
 
     @Override
-    public void insert(StorageEnum type, String collection, Map params) throws IOException {
-        createShardIfNotExist(collection);
-        Document doc = DocumentUtil.convertConfig2Doc(params);
-        map.get(collection).insert(doc);
+    public void deleteAll(String sharding) {
+        shards.computeIfPresent(sharding, (k, v) -> {
+            v.deleteAll();
+            return v;
+        });
+        shards.remove(sharding);
     }
 
     @Override
-    public void update(StorageEnum type, String collection, Map params) throws IOException {
-        createShardIfNotExist(collection);
-        Document doc = DocumentUtil.convertConfig2Doc(params);
-        IndexableField field = doc.getField(ConfigConstant.CONFIG_MODEL_ID);
-        map.get(collection).update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), doc);
+    protected void batchInsert(StorageEnum type, String sharding, List<Map> list) {
+        batchExecute(type, sharding, list, (shard, docs) -> shard.insertBatch(docs));
     }
 
     @Override
-    public void delete(StorageEnum type, String collection, String id) throws IOException {
-        createShardIfNotExist(collection);
-        map.get(collection).delete(new Term(ConfigConstant.CONFIG_MODEL_ID, id));
+    protected void batchUpdate(StorageEnum type, String sharding, List<Map> list) {
+        batchExecute(type, sharding, list, (shard, docs) -> {
+            for (Document doc : docs) {
+                shard.update(getPrimaryKeyTerm(doc), doc);
+            }
+        });
     }
 
     @Override
-    public void deleteAll(StorageEnum type, String collection) throws IOException {
-        synchronized (this) {
-            Shard shard = map.get(collection);
-            if (null != shard) {
-                shard.deleteAll();
-                map.remove(collection);
-            }
+    protected void batchDelete(StorageEnum type, String sharding, List<String> ids) {
+        Shard shard = getShard(sharding);
+        int size = ids.size();
+        Term[] terms = new Term[size];
+        for (int i = 0; i < size; i++) {
+            terms[i] = getPrimaryKeyTerm(ids.get(i));
+        }
+        try {
+            shard.deleteBatch(terms);
+        } catch (IOException e) {
+            throw new StorageException(e);
         }
     }
 
     @Override
-    public void insertLog(StorageEnum type, String collection, Map<String, Object> params) throws IOException {
-        createShardIfNotExist(collection);
-        Document doc = DocumentUtil.convertLog2Doc(params);
-        map.get(collection).insert(doc);
+    public void destroy() throws Exception {
+        for (Map.Entry<String, Shard> m : shards.entrySet()) {
+            m.getValue().close();
+        }
+        shards.clear();
     }
 
-    @Override
-    public void insertData(StorageEnum type, String collection, List<Map> list) throws IOException {
-        createShardIfNotExist(collection);
-        List<Document> docs = list.stream().map(r -> DocumentUtil.convertData2Doc(r)).collect(Collectors.toList());
-        map.get(collection).insertBatch(docs);
+    private Term getPrimaryKeyTerm(Document doc) {
+        return new Term(ConfigConstant.CONFIG_MODEL_ID, doc.getField(ConfigConstant.CONFIG_MODEL_ID).stringValue());
+    }
+
+    private Term getPrimaryKeyTerm(String id) {
+        return new Term(ConfigConstant.CONFIG_MODEL_ID, id);
+    }
+
+    private void batchExecute(StorageEnum type, String sharding, List<Map> list, ExecuteMapper mapper) {
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+
+        Shard shard = getShard(sharding);
+        List<Document> docs = new ArrayList<>();
+        list.forEach(r -> {
+            switch (type) {
+                case DATA:
+                    docs.add(DocumentUtil.convertData2Doc(r));
+                    break;
+                case LOG:
+                    docs.add(DocumentUtil.convertLog2Doc(r));
+                    break;
+                case CONFIG:
+                    docs.add(DocumentUtil.convertConfig2Doc(r));
+                    break;
+                case BINLOG:
+                    docs.add(DocumentUtil.convertBinlog2Doc(r));
+                    break;
+                default:
+                    break;
+            }
+        });
+        try {
+            mapper.apply(shard, docs);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
 
     /**
@@ -149,34 +173,14 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
      * <p>/data/log</p>
      * <p>/data/data/123</p>
      *
-     * @param collectionId
+     * @param sharding
      * @throws IOException
      */
-    private void createShardIfNotExist(String collectionId) throws IOException {
-        synchronized (this) {
-            if (null == map.get(collectionId)) {
-                map.putIfAbsent(collectionId, new Shard(PATH + collectionId));
-            }
-        }
-    }
-
-    private Shard cacheShardIfExist(String collectionId) {
-        String path = PATH + collectionId;
-        if (new File(path).exists()) {
-            try {
-                map.putIfAbsent(collectionId, new Shard(path));
-            } catch (IOException e) {
-                logger.error(e.getMessage());
-            }
-        }
-        return map.get(collectionId);
+    private Shard getShard(String sharding) {
+        return shards.computeIfAbsent(sharding, k -> new Shard(PATH + k));
     }
 
-    @Override
-    public void destroy() throws Exception {
-        for (Map.Entry<String, Shard> m : map.entrySet()) {
-            m.getValue().close();
-        }
-        map.clear();
+    interface ExecuteMapper {
+        void apply(Shard shard, List<Document> docs) throws IOException;
     }
 }

+ 100 - 120
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.storage.support;
 
+import org.apache.commons.io.IOUtils;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -14,7 +15,6 @@ import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.storage.AbstractStorageService;
-import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Param;
@@ -30,7 +30,10 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.sql.SQLSyntaxErrorException;
 import java.sql.Types;
 import java.util.ArrayList;
@@ -92,14 +95,19 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     @Override
-    public Paging select(Query query) {
+    protected String getSeparator() {
+        return "_";
+    }
+
+    @Override
+    protected Paging select(String sharding, Query query) {
         Paging paging = new Paging(query.getPageNum(), query.getPageSize());
-        Executor executor = getExecutor(query.getType(), query.getCollection());
+        Executor executor = getExecutor(query.getType(), sharding);
         List<Object> queryCountArgs = new ArrayList<>();
         String queryCountSql = buildQueryCountSql(query, executor, queryCountArgs);
         Long total = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(queryCountSql, queryCountArgs.toArray(), Long.class));
+        paging.setTotal(total);
         if (query.isQueryTotal()) {
-            paging.setTotal(total);
             return paging;
         }
 
@@ -108,60 +116,55 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         List<Map<String, Object>> data = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, queryArgs.toArray()));
         replaceHighLight(query, data);
         paging.setData(data);
-        paging.setTotal(total);
         return paging;
     }
 
     @Override
-    public void insert(StorageEnum type, String table, Map params) {
-        executeInsert(type, table, params);
+    protected void deleteAll(String sharding) {
+        tables.computeIfPresent(sharding, (k, executor) -> {
+            String sql = getExecutorSql(executor, k);
+            executeSql(sql);
+            return executor;
+        });
+        tables.remove(sharding);
     }
 
     @Override
-    public void update(StorageEnum type, String table, Map params) {
-        Executor executor = getExecutor(type, table);
-        String sql = executor.getUpdate();
-        List<Object> args = getUpdateArgs(executor, params);
-        int update = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, args.toArray()));
-        Assert.isTrue(update > 0, "update failed");
-    }
+    protected void batchInsert(StorageEnum type, String sharding, List<Map> list) {
+        batchExecute(type, sharding, list, new ExecuteMapper() {
+            @Override
+            public String getSql(Executor executor) {
+                return executor.getInsert();
+            }
 
-    @Override
-    public void delete(StorageEnum type, String table, String id) {
-        Executor executor = getExecutor(type, table);
-        String sql = executor.getDelete();
-        int delete = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, new Object[]{id}));
-        Assert.isTrue(delete > 0, "delete failed");
+            @Override
+            public Object[] getArgs(Executor executor, Map params) {
+                return getInsertArgs(executor, params);
+            }
+        });
     }
 
     @Override
-    public void deleteAll(StorageEnum type, String table) {
-        Executor executor = getExecutor(type, table);
-        if (executor.isSystemType()) {
-            String sql = String.format(TRUNCATE_TABLE, PREFIX_TABLE.concat(table));
-            executeSql(sql);
-            return;
-        }
-
-        if (tables.containsKey(table)) {
-            tables.remove(table);
-            String sql = String.format(DROP_TABLE, PREFIX_TABLE.concat(table));
-            executeSql(sql);
-        }
-    }
+    protected void batchUpdate(StorageEnum type, String sharding, List<Map> list) {
+        batchExecute(type, sharding, list, new ExecuteMapper() {
+            @Override
+            public String getSql(Executor executor) {
+                return executor.getUpdate();
+            }
 
-    @Override
-    public void insertLog(StorageEnum type, String table, Map<String, Object> params) {
-        executeInsert(type, table, params);
+            @Override
+            public Object[] getArgs(Executor executor, Map params) {
+                return getUpdateArgs(executor, params);
+            }
+        });
     }
 
     @Override
-    public void insertData(StorageEnum type, String table, List<Map> list) {
-        if (!CollectionUtils.isEmpty(list)) {
-            Executor executor = getExecutor(type, table);
-            List<Object[]> args = list.stream().map(row -> getArgs(executor, row).toArray()).collect(Collectors.toList());
-            connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(executor.getInsert(), args));
-        }
+    protected void batchDelete(StorageEnum type, String sharding, List<String> ids) {
+        final Executor executor = getExecutor(type, sharding);
+        final String sql = executor.getDelete();
+        final List<Object[]> args = ids.stream().map(id -> new Object[]{id}).collect(Collectors.toList());
+        connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, args));
     }
 
     @Override
@@ -169,28 +172,36 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         connectorMapper.close();
     }
 
-    @Override
-    protected String getSeparator() {
-        return "_";
+    private void batchExecute(StorageEnum type, String sharding, List<Map> list, ExecuteMapper mapper) {
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+
+        final Executor executor = getExecutor(type, sharding);
+        final String sql = mapper.getSql(executor);
+        final List<Object[]> args = list.stream().map(row -> mapper.getArgs(executor, row)).collect(Collectors.toList());
+        connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, args));
     }
 
-    private int executeInsert(StorageEnum type, String table, Map params) {
-        Executor executor = getExecutor(type, table);
-        String sql = executor.getInsert();
-        List<Object> args = getArgs(executor, params);
-        int insert = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, args.toArray()));
-        if (insert < 1) {
-            logger.error("table:{}, params:{}");
-            throw new StorageException("insert failed");
-        }
-        return insert;
+    private Executor getExecutor(StorageEnum type, String sharding) {
+        return tables.computeIfAbsent(sharding, (table) -> {
+            Executor dataTemplate = tables.get(type.getType());
+            Assert.notNull(dataTemplate, "未知的存储类型");
+
+            Executor newExecutor = new Executor(dataTemplate.getType(), dataTemplate.getFields(), dataTemplate.isSystemTable(), dataTemplate.isOrderByUpdateTime());
+            return createTableIfNotExist(table, newExecutor);
+        });
     }
 
-    private List<Object> getArgs(Executor executor, Map params) {
-        return executor.getFields().stream().map(f -> params.get(f.getLabelName())).collect(Collectors.toList());
+    private String getExecutorSql(Executor executor, String sharding) {
+        return executor.isSystemTable() ? String.format(TRUNCATE_TABLE, PREFIX_TABLE.concat(sharding)) : String.format(DROP_TABLE, PREFIX_TABLE.concat(sharding));
     }
 
-    private List<Object> getUpdateArgs(Executor executor, Map params) {
+    private Object[] getInsertArgs(Executor executor, Map params) {
+        return executor.getFields().stream().map(f -> params.get(f.getLabelName())).collect(Collectors.toList()).toArray();
+    }
+
+    private Object[] getUpdateArgs(Executor executor, Map params) {
         List<Object> args = new ArrayList<>();
         Object pk = null;
         for (Field f : executor.getFields()) {
@@ -203,29 +214,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
         Assert.notNull(pk, "The primaryKey is null.");
         args.add(pk);
-        return args;
-    }
-
-    private Executor getExecutor(StorageEnum type, String table) {
-        // 获取模板
-        Executor executor = tables.get(type.getType());
-        Assert.notNull(executor, "未知的存储类型");
-
-        if (tables.containsKey(table)) {
-            return tables.get(table);
-        }
-        synchronized (tables) {
-            // 检查本地缓存
-            if (tables.containsKey(table)) {
-                return tables.get(table);
-            }
-            // 不存在
-            Executor newExecutor = new Executor(executor.getGroup(), executor.getFields(), executor.isDynamicTableName(), executor.isSystemType(), executor.isOrderByUpdateTime());
-            createTableIfNotExist(table, newExecutor);
-
-            tables.putIfAbsent(table, newExecutor);
-            return newExecutor;
-        }
+        return args.toArray();
     }
 
     private String buildQuerySql(Query query, Executor executor, List<Object> args) {
@@ -265,7 +254,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
                 // name=?
                 sql.append(p.getKey()).append(p.isHighlighter() ? " LIKE ?" : "=?");
                 args.add(p.isHighlighter() ? new StringBuilder("%").append(p.getValue()).append("%") : p.getValue());
-                flag.compareAndSet(false, true);
+                flag.set(true);
             });
         }
     }
@@ -320,12 +309,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> dataFields = builder.getFields();
 
-        tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields, false, true, true));
-        tables.putIfAbsent(StorageEnum.LOG.getType(), new Executor(StorageEnum.LOG, logFields, false, true, false));
-        tables.putIfAbsent(StorageEnum.DATA.getType(), new Executor(StorageEnum.DATA, dataFields, true, false, false));
+        tables.computeIfAbsent(StorageEnum.CONFIG.getType(), k -> new Executor(k, configFields, true, true));
+        tables.computeIfAbsent(StorageEnum.LOG.getType(), k -> new Executor(k, logFields, true, false));
+        tables.computeIfAbsent(StorageEnum.DATA.getType(), k -> new Executor(k, dataFields, false, false));
         // 创建表
         tables.forEach((tableName, e) -> {
-            if (!e.isDynamicTableName()) {
+            if (e.isSystemTable()) {
                 createTableIfNotExist(tableName, e);
             }
         });
@@ -334,7 +323,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         TimeUnit.SECONDS.sleep(1);
     }
 
-    private void createTableIfNotExist(String table, Executor executor) {
+    private Executor createTableIfNotExist(String table, Executor executor) {
         table = PREFIX_TABLE.concat(table);
         // show tables where Tables_in_dbsyncer = "dbsyncer_config"
         String sql = String.format(SHOW_TABLE, database, table);
@@ -342,7 +331,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForMap(sql));
         } catch (EmptyResultDataAccessException e) {
             // 不存在表
-            String ddl = readSql(executor.getGroup().getType(), executor.isDynamicTableName(), table);
+            String ddl = readSql(executor.getType(), executor.isSystemTable(), table);
             logger.info(ddl);
             executeSql(ddl);
         }
@@ -355,9 +344,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         String update = SqlBuilderEnum.UPDATE.getSqlBuilder().buildSql(config);
         String delete = SqlBuilderEnum.DELETE.getSqlBuilder().buildSql(config);
         executor.setTable(table).setQuery(query).setInsert(insert).setUpdate(update).setDelete(delete);
+        return executor;
     }
 
-    private String readSql(String type, boolean dynamicTableName, String table) {
+    private String readSql(String type, boolean systemTable, String table) {
         String template = PREFIX_TABLE.concat(type);
         String filePath = "/".concat(template).concat(".sql");
 
@@ -376,28 +366,18 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         } catch (IOException e) {
             logger.error("failed read file:{}", filePath);
         } finally {
-            close(bf);
-            close(isr);
-            close(in);
+            IOUtils.closeQuietly(bf);
+            IOUtils.closeQuietly(isr);
+            IOUtils.closeQuietly(in);
         }
 
         // 动态替换表名
-        if (dynamicTableName) {
+        if (!systemTable) {
             return StringUtil.replaceOnce(res.toString(), template, table);
         }
         return res.toString();
     }
 
-    private void close(Closeable closeable) {
-        if (null != closeable) {
-            try {
-                closeable.close();
-            } catch (IOException e) {
-                logger.error(e.getMessage());
-            }
-        }
-    }
-
     private void executeSql(String ddl) {
         connectorMapper.execute(databaseTemplate -> {
             databaseTemplate.execute(ddl);
@@ -470,17 +450,15 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         private String insert;
         private String update;
         private String delete;
-        private StorageEnum group;
+        private String type;
         private List<Field> fields;
-        private boolean dynamicTableName;
-        private boolean systemType;
+        private boolean systemTable;
         private boolean orderByUpdateTime;
 
-        public Executor(StorageEnum group, List<Field> fields, boolean dynamicTableName, boolean systemType, boolean orderByUpdateTime) {
-            this.group = group;
+        public Executor(String type, List<Field> fields, boolean systemTable, boolean orderByUpdateTime) {
+            this.type = type;
             this.fields = fields;
-            this.dynamicTableName = dynamicTableName;
-            this.systemType = systemType;
+            this.systemTable = systemTable;
             this.orderByUpdateTime = orderByUpdateTime;
         }
 
@@ -529,20 +507,16 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             return this;
         }
 
-        public StorageEnum getGroup() {
-            return group;
+        public String getType() {
+            return type;
         }
 
         public List<Field> getFields() {
             return fields;
         }
 
-        public boolean isDynamicTableName() {
-            return dynamicTableName;
-        }
-
-        public boolean isSystemType() {
-            return systemType;
+        public boolean isSystemTable() {
+            return systemTable;
         }
 
         public boolean isOrderByUpdateTime() {
@@ -550,4 +524,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
 
     }
+
+    interface ExecuteMapper {
+        String getSql(Executor executor);
+
+        Object[] getArgs(Executor executor, Map params);
+    }
 }

+ 25 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java

@@ -94,6 +94,8 @@ public abstract class DocumentUtil {
         Document doc = new Document();
         String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
         Integer success = (Integer) params.get(ConfigConstant.DATA_SUCCESS);
+        String tableGroupId = (String) params.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+        String targetTableName = (String) params.get(ConfigConstant.DATA_TARGET_TABLE_NAME);
         String event = (String) params.get(ConfigConstant.DATA_EVENT);
         String error = (String) params.get(ConfigConstant.DATA_ERROR);
         String json = (String) params.get(ConfigConstant.CONFIG_MODEL_JSON);
@@ -102,6 +104,8 @@ public abstract class DocumentUtil {
         doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
         doc.add(new IntPoint(ConfigConstant.DATA_SUCCESS, success));
         doc.add(new StoredField(ConfigConstant.DATA_SUCCESS, success));
+        doc.add(new StringField(ConfigConstant.DATA_TABLE_GROUP_ID, tableGroupId, Field.Store.YES));
+        doc.add(new StringField(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableName, Field.Store.YES));
         doc.add(new StringField(ConfigConstant.DATA_EVENT, event, Field.Store.YES));
         doc.add(new TextField(ConfigConstant.DATA_ERROR, error, Field.Store.YES));
         doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, json));
@@ -112,6 +116,7 @@ public abstract class DocumentUtil {
         return doc;
     }
 
+    @Deprecated
     public static Document convertBinlog2Doc(String messageId, int status, BytesRef bytes, long updateTime) {
         Document doc = new Document();
         doc.add(new StringField(BinlogConstant.BINLOG_ID, messageId, Field.Store.YES));
@@ -128,4 +133,24 @@ public abstract class DocumentUtil {
         return doc;
     }
 
+    public static Document convertBinlog2Doc(Map params) {
+        Document doc = new Document();
+        String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
+        doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
+
+        Integer status = (Integer) params.get(ConfigConstant.BINLOG_STATUS);
+        doc.add(new IntPoint(ConfigConstant.BINLOG_STATUS, status));
+        doc.add(new StoredField(ConfigConstant.BINLOG_STATUS, status));
+
+        byte[] bytes = (byte[]) params.get(ConfigConstant.CONFIG_MODEL_JSON);
+        doc.add(new BinaryDocValuesField(ConfigConstant.CONFIG_MODEL_JSON, new BytesRef(bytes)));
+        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, bytes));
+
+        Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
+        doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new NumericDocValuesField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        return doc;
+    }
+
 }