Browse Source

!72 merge
Merge pull request !72 from AE86/V_1.0.0_Beta

AE86 2 years ago
parent
commit
f7ac6f9c2b
24 changed files with 260 additions and 187 deletions
  1. 1 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java
  2. 4 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ProjectGroupServiceImpl.java
  3. 10 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/ProjectGroupVo.java
  4. 1 3
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/Paging.java
  5. 12 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java
  6. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  7. 39 24
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java
  8. 26 26
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  9. 1 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/Puller.java
  10. 3 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  11. 17 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  12. 20 8
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/StringToTimestampHandler.java
  13. 13 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java
  14. 35 7
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java
  15. 0 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  16. 24 30
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  17. 2 2
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/IndexController.java
  18. 28 34
      dbsyncer-web/src/main/resources/public/index/index.html
  19. 6 1
      dbsyncer-web/src/main/resources/public/mapping/editFull.html
  20. 1 1
      dbsyncer-web/src/main/resources/public/monitor/monitor.html
  21. 5 26
      dbsyncer-web/src/main/resources/static/css/common.css
  22. 1 0
      dbsyncer-web/src/main/resources/static/css/index/index.css
  23. 1 1
      dbsyncer-web/src/main/resources/static/js/index/index.js
  24. 8 9
      dbsyncer-web/src/main/resources/static/js/monitor/index.js

+ 1 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -93,6 +93,7 @@ public class MappingChecker extends AbstractChecker {
         // 全量配置
         mapping.setReadNum(NumberUtil.toInt(params.get("readNum"), mapping.getReadNum()));
         mapping.setBatchNum(NumberUtil.toInt(params.get("batchNum"), mapping.getBatchNum()));
+        mapping.setThreadNum(NumberUtil.toInt(params.get("threadNum"), mapping.getThreadNum()));
 
         // 增量配置(日志/定时)
         String incrementStrategy = params.get("incrementStrategy");

+ 4 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ProjectGroupServiceImpl.java

@@ -71,8 +71,11 @@ public class ProjectGroupServiceImpl extends BaseServiceImpl implements ProjectG
     @Override
     public ProjectGroupVo getProjectGroup(String id) {
         ProjectGroupVo vo = new ProjectGroupVo();
+        List<Connector> connectors = connectorService.getConnectorAll();
+        vo.setConnectorSize(CollectionUtils.isEmpty(connectors) ? 0 : connectors.size());
+
         if (StringUtil.isBlank(id)) {
-            vo.setConnectors(connectorService.getConnectorAll());
+            vo.setConnectors(connectors);
             vo.setMappings(mappingService.getMappingAll());
             return vo;
         }
@@ -87,7 +90,6 @@ public class ProjectGroupServiceImpl extends BaseServiceImpl implements ProjectG
         List<String> connectorIds = projectGroup.getConnectorIds();
         if (!CollectionUtils.isEmpty(connectorIds)) {
             Set<String> connectorIdSet = new HashSet<>(connectorIds);
-            List<Connector> connectors = connectorService.getConnectorAll();
             if (!CollectionUtils.isEmpty(connectors)) {
                 vo.setConnectors(connectors.stream()
                         .filter((connector -> connectorIdSet.contains(connector.getId())))

+ 10 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/ProjectGroupVo.java

@@ -13,10 +13,20 @@ import java.util.List;
  */
 public class ProjectGroupVo extends ProjectGroup {
 
+    private int connectorSize;
+
     private List<Connector> connectors = new ArrayList<>();
 
     private List<MappingVo> mappings = new ArrayList<>();
 
+    public int getConnectorSize() {
+        return connectorSize;
+    }
+
+    public void setConnectorSize(int connectorSize) {
+        this.connectorSize = connectorSize;
+    }
+
     public List<Connector> getConnectors() {
         return connectors;
     }

+ 1 - 3
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Paging.java

@@ -4,16 +4,14 @@ import java.util.Collection;
 import java.util.Collections;
 
 public class Paging {
-
     private long total;
     private int pageNum;
     private int pageSize;
-    private Collection data;
+    private Collection data = Collections.EMPTY_LIST;
 
     public Paging(int pageNum, int pageSize) {
         this.pageNum = pageNum;
         this.pageSize = pageSize;
-        this.data = Collections.EMPTY_LIST;
     }
 
     public long getTotal() {

+ 12 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java

@@ -2,6 +2,9 @@ package org.dbsyncer.common.util;
 
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.time.*;
 import java.time.format.*;
 import java.time.temporal.ChronoField;
@@ -13,6 +16,10 @@ public abstract class DateFormatUtil {
      * yyyy-MM-dd HH:mm:ss
      */
     public static final DateTimeFormatter CHINESE_STANDARD_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    /**
+     * yyyy-MM-dd'T'HH:mm:ss.SSSz
+     */
+    public static final DateFormat GMT_FORMATTER = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz");
     /**
      * yyyy-MM-dd
      */
@@ -87,6 +94,11 @@ public abstract class DateFormatUtil {
     public static Timestamp stringToTimestamp(String s) {
         return Timestamp.valueOf(LocalDateTime.from(CHINESE_STANDARD_TIME_FORMATTER.parse(s)));
     }
+
+    public static Timestamp stringToTimestamp(String s, DateFormat formatter) throws ParseException {
+        return new Timestamp(formatter.parse(s).getTime());
+    }
+
     public static OffsetTime timeWithTimeZone(String s) {
         return OffsetTime.parse(s, TIME_TZ_FORMAT).withOffsetSameInstant(ZoneOffset.UTC);
     }

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -369,7 +369,7 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         // 标记运行中
         changeMetaState(mapping.getMetaId(), MetaEnum.RUNNING);
 
-        puller.asyncStart(mapping);
+        puller.start(mapping);
     }
 
     @Override
@@ -412,4 +412,4 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         return puller;
     }
 
-}
+}

+ 39 - 24
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -20,6 +20,8 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * 全量同步
@@ -45,28 +47,11 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
     private Map<String, Task> map = new ConcurrentHashMap<>();
 
     @Override
-    public void asyncStart(Mapping mapping) {
-        final String mappingId = mapping.getId();
-        final String metaId = mapping.getMetaId();
-        map.putIfAbsent(metaId, new Task(metaId));
-
-        try {
-            List<TableGroup> list = manager.getTableGroupAll(mappingId);
-            Assert.notEmpty(list, "映射关系不能为空");
-
-            // 执行任务
-            logger.info("启动任务:{}", metaId);
-            Task task = map.get(metaId);
-            doTask(task, mapping, list);
-
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-            logService.log(LogType.SystemLog.ERROR, e.getMessage());
-        } finally {
-            map.remove(metaId);
-            publishClosedEvent(metaId);
-            logger.info("结束任务:{}", metaId);
-        }
+    public void start(Mapping mapping) {
+        FullWorker worker = new FullWorker(mapping);
+        worker.setName(new StringBuilder("full-worker-").append(mapping.getId()).toString());
+        worker.setDaemon(false);
+        worker.start();
     }
 
     @Override
@@ -83,7 +68,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         flush(event.getTask());
     }
 
-    private void doTask(Task task, Mapping mapping, List<TableGroup> list) {
+    private void doTask(Task task, Mapping mapping, List<TableGroup> list, ExecutorService executorService) {
         // 记录开始时间
         long now = Instant.now().toEpochMilli();
         task.setBeginTime(now);
@@ -94,7 +79,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
             if (!task.isRunning()) {
                 break;
             }
-            parser.execute(task, mapping, t);
+            parser.execute(task, mapping, t, executorService);
         }
 
         // 记录结束时间
@@ -111,4 +96,34 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         manager.editMeta(meta);
     }
 
+    final class FullWorker extends Thread {
+        Mapping mapping;
+        List<TableGroup> list;
+
+        public FullWorker(Mapping mapping) {
+            this.mapping = mapping;
+            this.list = manager.getTableGroupAll(mapping.getId());
+            Assert.notEmpty(list, "映射关系不能为空");
+        }
+
+        @Override
+        public void run() {
+            final String metaId = mapping.getMetaId();
+            logger.info("开始全量同步:{}, {}", metaId, mapping.getName());
+            try {
+                map.putIfAbsent(metaId, new Task(metaId));
+                final ExecutorService executor = Executors.newFixedThreadPool(mapping.getThreadNum());
+                Task task = map.get(metaId);
+                doTask(task, mapping, list, executor);
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+                logService.log(LogType.SystemLog.ERROR, e.getMessage());
+            } finally {
+                map.remove(metaId);
+                publishClosedEvent(metaId);
+                logger.info("结束全量同步:{}, {}", metaId, mapping.getName());
+            }
+        }
+    }
+
 }

+ 26 - 26
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -5,12 +5,10 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
@@ -86,32 +84,34 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     }
 
     @Override
-    public void asyncStart(Mapping mapping) {
+    public void start(Mapping mapping) {
         final String mappingId = mapping.getId();
         final String metaId = mapping.getMetaId();
-        try {
-            Connector connector = manager.getConnector(mapping.getSourceConnectorId());
-            Assert.notNull(connector, "连接器不能为空.");
-            List<TableGroup> list = manager.getTableGroupAll(mappingId);
-            Assert.notEmpty(list, "映射关系不能为空.");
-            Meta meta = manager.getMeta(metaId);
-            Assert.notNull(meta, "Meta不能为空.");
-            AbstractExtractor extractor = getExtractor(mapping, connector, list, meta);
-
-            long now = Instant.now().toEpochMilli();
-            meta.setBeginTime(now);
-            meta.setEndTime(now);
-            manager.editMeta(meta);
-            map.putIfAbsent(metaId, extractor);
-
-            // 执行任务
-            logger.info("启动成功:{}", metaId);
-            map.get(metaId).start();
-        } catch (Exception e) {
-            close(metaId);
-            logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
-            logger.error("运行异常,结束任务{}:{}", metaId, e.getMessage());
-        }
+        logger.info("开始增量同步:{}, {}", metaId, mapping.getName());
+        Connector connector = manager.getConnector(mapping.getSourceConnectorId());
+        Assert.notNull(connector, "连接器不能为空.");
+        List<TableGroup> list = manager.getTableGroupAll(mappingId);
+        Assert.notEmpty(list, "映射关系不能为空.");
+        Meta meta = manager.getMeta(metaId);
+        Assert.notNull(meta, "Meta不能为空.");
+
+        Thread worker = new Thread(()->{
+            try {
+                long now = Instant.now().toEpochMilli();
+                meta.setBeginTime(now);
+                meta.setEndTime(now);
+                manager.editMeta(meta);
+                map.putIfAbsent(metaId, getExtractor(mapping, connector, list, meta));
+                map.get(metaId).start();
+            } catch (Exception e) {
+                close(metaId);
+                logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
+                logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
+            }
+        });
+        worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
+        worker.setDaemon(false);
+        worker.start();
     }
 
     @Override

+ 1 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/Puller.java

@@ -1,12 +1,10 @@
 package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.parser.model.Mapping;
-import org.springframework.scheduling.annotation.Async;
 
 public interface Puller {
 
-    @Async("taskExecutor")
-    void asyncStart(Mapping mapping);
+    void start(Mapping mapping);
 
     void close(String metaId);
 

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -16,6 +16,7 @@ import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * @author AE86
@@ -148,8 +149,9 @@ public interface Parser {
      * @param task
      * @param mapping
      * @param tableGroup
+     * @param executorService
      */
-    void execute(Task task, Mapping mapping, TableGroup tableGroup);
+    void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService);
 
     /**
      * 增量同步

+ 17 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -5,7 +5,6 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.CommandConfig;
@@ -49,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 /**
  * @author AE86
@@ -227,7 +227,7 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
+    public void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService) {
         final String metaId = task.getId();
         final String sourceConnectorId = mapping.getSourceConnectorId();
         final String targetConnectorId = mapping.getTargetConnectorId();
@@ -280,7 +280,8 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            Result writer = writeBatch(new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize));
+            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
+            Result writer = writeBatch(batchWriter, executorService);
 
             // 6、更新结果
             flush(task, writer);
@@ -288,11 +289,12 @@ public class ParserFactory implements Parser {
             // 7、判断尾页
             if (data.size() < pageSize) {
                 params.clear();
-                logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
+                logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
 
             // 8、更新分页数
+            // TODO 记录表offset
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
         }
     }
@@ -323,6 +325,17 @@ public class ParserFactory implements Parser {
      */
     @Override
     public Result writeBatch(BatchWriter batchWriter) {
+        return writeBatch(batchWriter, taskExecutor);
+    }
+
+    /**
+     * 批量写入
+     *
+     * @param batchWriter
+     * @param taskExecutor
+     * @return
+     */
+    private Result writeBatch(BatchWriter batchWriter, Executor taskExecutor) {
         List<Map> dataList = batchWriter.getDataList();
         int batchSize = batchWriter.getBatchSize();
         String tableName = batchWriter.getTableName();

+ 20 - 8
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/StringToTimestampHandler.java

@@ -4,6 +4,8 @@ import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.parser.convert.AbstractHandler;
 
+import java.text.ParseException;
+
 /**
  * 字符串转Timestamp
  *
@@ -14,19 +16,29 @@ import org.dbsyncer.parser.convert.AbstractHandler;
 public class StringToTimestampHandler extends AbstractHandler {
 
     @Override
-    public Object convert(String args, Object value) {
+    public Object convert(String args, Object value) throws ParseException {
         if (value instanceof String) {
             String s = (String) value;
             // 2020-7-12 00:00:00
-            if(s.length() < 19){
-                s = format(s);
+            if (s.length() < 19) {
+                return DateFormatUtil.stringToTimestamp(format(s));
+            }
+
+            // 2022-07-21T05:35:34.000+0800
+            if (s.length() == 28) {
+                return DateFormatUtil.stringToTimestamp(s, DateFormatUtil.GMT_FORMATTER);
+            }
+
+            // 2022-07-21T05:35:34.000+08:00
+            if (s.length() == 29) {
+                s = s.replaceAll(":[^:]*$", "00");
+                return DateFormatUtil.stringToTimestamp(s, DateFormatUtil.GMT_FORMATTER);
             }
-            value = DateFormatUtil.stringToTimestamp(s);
         }
         return value;
     }
 
-    private String format(String s){
+    private String format(String s) {
         StringBuilder buf = new StringBuilder();
         Lexer lexer = new Lexer(s);
         char comma = '-';
@@ -53,13 +65,13 @@ public class StringToTimestampHandler extends AbstractHandler {
 
     private void nextToken(Lexer lexer, StringBuilder buf, char comma, boolean appendComma) {
         buf.append(fillZero(lexer.nextToken(comma)));
-        if(appendComma){
+        if (appendComma) {
             buf.append(comma);
         }
     }
 
-    private String fillZero(String s){
-        if(s.length() < 2){
+    private String fillZero(String s) {
+        if (s.length() < 2) {
             return String.format("%02d", Integer.parseInt(s));
         }
         return s;

+ 13 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -46,6 +46,11 @@ public class Mapping extends AbstractConfigModel {
     // 单次写入
     private int batchNum = 1000;
 
+    /**
+     * 线程数
+     */
+    private int threadNum = Runtime.getRuntime().availableProcessors() * 2;
+
     public String getSourceConnectorId() {
         return sourceConnectorId;
     }
@@ -118,9 +123,15 @@ public class Mapping extends AbstractConfigModel {
         return batchNum;
     }
 
-    public Mapping setBatchNum(int batchNum) {
+    public void setBatchNum(int batchNum) {
         this.batchNum = batchNum;
-        return this;
     }
 
+    public int getThreadNum() {
+        return threadNum;
+    }
+
+    public void setThreadNum(int threadNum) {
+        this.threadNum = threadNum;
+    }
 }

+ 35 - 7
dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java

@@ -14,6 +14,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @author AE86
@@ -24,6 +26,10 @@ public abstract class AbstractStorageService implements StorageService, Disposab
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private final Lock lock = new ReentrantLock(true);
+
+    private volatile boolean tryDeleteAll;
+
     @Autowired
     private Map<String, Strategy> map;
 
@@ -55,14 +61,27 @@ public abstract class AbstractStorageService implements StorageService, Disposab
 
     @Override
     public Paging query(Query query) {
+        if(tryDeleteAll){
+            return new Paging(query.getPageNum(), query.getPageSize());
+        }
+
+        boolean locked = false;
         try {
-            String collection = getCollection(query.getType(), query.getCollection());
-            query.setCollection(collection);
-            return select(query);
-        } catch (IOException e) {
+            locked = lock.tryLock();
+            if (locked) {
+                String collection = getCollection(query.getType(), query.getCollection());
+                query.setCollection(collection);
+                return select(query);
+            }
+        } catch (Exception e) {
             logger.error("query collectionId:{}, params:{}, failed:{}", query.getCollection(), query.getParams(), e.getMessage());
             throw new StorageException(e);
+        } finally {
+            if (locked) {
+                lock.unlock();
+            }
         }
+        return new Paging(query.getPageNum(), query.getPageSize());
     }
 
     @Override
@@ -143,12 +162,21 @@ public abstract class AbstractStorageService implements StorageService, Disposab
 
     @Override
     public void clear(StorageEnum type, String collectionId) {
+        boolean locked = false;
         try {
-            String collection = getCollection(type, collectionId);
-            deleteAll(type, collection);
-        } catch (IOException e) {
+            locked = lock.tryLock();
+            if (locked) {
+                tryDeleteAll = true;
+                deleteAll(type, getCollection(type, collectionId));
+            }
+        } catch (Exception e) {
             logger.error("clear collectionId:{}, failed:{}", collectionId, e.getMessage());
             throw new StorageException(e);
+        } finally {
+            if (locked) {
+                tryDeleteAll = false;
+                lock.unlock();
+            }
         }
     }
 

+ 0 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -127,7 +127,6 @@ public class Shard {
         Paging paging = new Paging(pageNum, pageSize);
         paging.setTotal(topDocs.totalHits);
         if(option.isQueryTotal()){
-            paging.setData(Collections.EMPTY_LIST);
             return paging;
         }
 

+ 24 - 30
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -61,7 +61,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
     private static final String TABLE_CREATE_TIME = "create_time";
     private static final String TABLE_UPDATE_TIME = "update_time";
-    private final Object LOCK = new Object();
 
     @Autowired
     private ConnectorFactory connectorFactory;
@@ -90,26 +89,23 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     @Override
     public Paging select(Query query) {
-        synchronized (LOCK){
-            Executor executor = getExecutor(query.getType(), query.getCollection());
-            Paging paging = new Paging(query.getPageNum(), query.getPageSize());
-            List<Object> queryCountArgs = new ArrayList<>();
-            String queryCountSql = buildQueryCountSql(query, executor, queryCountArgs);
-            Long total = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(queryCountSql, queryCountArgs.toArray(), Long.class));
-            if (query.isQueryTotal()) {
-                paging.setTotal(total);
-                paging.setData(Collections.EMPTY_LIST);
-                return paging;
-            }
-
-            List<Object> queryArgs = new ArrayList<>();
-            String querySql = buildQuerySql(query, executor, queryArgs);
-            List<Map<String, Object>> data = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, queryArgs.toArray()));
-            replaceHighLight(query, data);
-            paging.setData(data);
+        Paging paging = new Paging(query.getPageNum(), query.getPageSize());
+        Executor executor = getExecutor(query.getType(), query.getCollection());
+        List<Object> queryCountArgs = new ArrayList<>();
+        String queryCountSql = buildQueryCountSql(query, executor, queryCountArgs);
+        Long total = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(queryCountSql, queryCountArgs.toArray(), Long.class));
+        if (query.isQueryTotal()) {
             paging.setTotal(total);
             return paging;
         }
+
+        List<Object> queryArgs = new ArrayList<>();
+        String querySql = buildQuerySql(query, executor, queryArgs);
+        List<Map<String, Object>> data = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, queryArgs.toArray()));
+        replaceHighLight(query, data);
+        paging.setData(data);
+        paging.setTotal(total);
+        return paging;
     }
 
     @Override
@@ -137,19 +133,17 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     @Override
     public void deleteAll(StorageEnum type, String table) {
-        synchronized (LOCK){
-            Executor executor = getExecutor(type, table);
-            if (executor.isSystemType()) {
-                String sql = String.format(TRUNCATE_TABLE, PREFIX_TABLE.concat(table));
-                executeSql(sql);
-                return;
-            }
+        Executor executor = getExecutor(type, table);
+        if (executor.isSystemType()) {
+            String sql = String.format(TRUNCATE_TABLE, PREFIX_TABLE.concat(table));
+            executeSql(sql);
+            return;
+        }
 
-            if (tables.containsKey(table)) {
-                tables.remove(table);
-                String sql = String.format(DROP_TABLE, PREFIX_TABLE.concat(table));
-                executeSql(sql);
-            }
+        if (tables.containsKey(table)) {
+            tables.remove(table);
+            String sql = String.format(DROP_TABLE, PREFIX_TABLE.concat(table));
+            executeSql(sql);
         }
     }
 

+ 2 - 2
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/IndexController.java

@@ -27,6 +27,7 @@ public class IndexController {
     @GetMapping("")
     public String index(HttpServletRequest request, ModelMap model, String projectGroupId) {
         ProjectGroupVo projectGroup = projectGroupService.getProjectGroup(projectGroupId);
+        model.put("connectorSize", projectGroup.getConnectorSize());
         model.put("connectors", projectGroup.getConnectors());
         model.put("mappings", projectGroup.getMappings());
         model.put("projectGroupId", projectGroupId);
@@ -40,5 +41,4 @@ public class IndexController {
         return RestResult.restSuccess(new VersionVo(appConfig.getName(), appConfig.getCopyright()));
     }
 
-}
-
+}

+ 28 - 34
dbsyncer-web/src/main/resources/public/index/index.html

@@ -7,55 +7,49 @@
     <div class="row">
         <form class="form-horizontal" role="form" method="post">
             <!-- 分组管理 -->
-            <div class="col-md-12">
-                <!-- 分组开始位置 -->
+            <div class="col-md-12" th:if="${projectGroups?.size() gt 0}">
+                <!-- 显示分组开始位置 -->
                 <div class="form-group">
                     <div class="col-md-12">
-                        <button type="button" class="btn btn-primary" id="addProjectGroupBtn">
-                            <span class="fa fa-plus"></span>添加分组([[${projectGroups?.size()} ?: 0]])
-                        </button>
-                    </div>
-                </div>
-                <!-- 显示分组 -->
-                <div class="row" th:if="${projectGroups?.size() gt 0}">
-                    <div class="col-md-12">
-                        <div class="panel panel-default">
-                            <div class="panel-body">
-                                <div class="row">
-                                    <div class="col-md-4">
-                                        <select id="projectGroup" name="projectGroup"
-                                                class="form-control select-control">
-                                            <option value="" th:text="全部" selected/>
-                                            <option th:each="g,s:${projectGroups}" th:value="${g?.id}" th:text="${g?.name}" th:selected="${g?.id eq projectGroupId}"/>
-                                        </select>
-                                    </div>
-                                    <div class="col-md-6"></div>
-                                    <div class="col-md-2 text-right">
-                                        <div th:if="${not #strings.isEmpty(projectGroupId) }">
-                                            <button type="button" class="btn btn-primary" id="editProjectGroupBtn">
-                                                <span class="fa fa-pencil"></span>修改
-                                            </button>
-                                            <button type="button" class="btn btn-default" id="removeProjectGroupBtn">
-                                                <span class="fa fa-times"></span>删除
-                                            </button>
-                                        </div>
-                                    </div>
+                        <div class="row">
+                            <div class="col-md-4">
+                                <select id="projectGroup" name="projectGroup" class="form-control select-control">
+                                    <option value="" th:text="全部" selected/>
+                                    <option th:each="g,s:${projectGroups}" th:value="${g?.id}" th:text="${g?.name}" th:selected="${g?.id eq projectGroupId}"/>
+                                </select>
+                            </div>
+                            <div class="col-md-6"></div>
+                            <div class="col-md-2 text-right">
+                                <div th:if="${not #strings.isEmpty(projectGroupId) }">
+                                    <button type="button" class="btn btn-primary" id="editProjectGroupBtn">
+                                        <span class="fa fa-pencil"></span>修改
+                                    </button>
+                                    <button type="button" class="btn btn-default" id="removeProjectGroupBtn">
+                                        <span class="fa fa-times"></span>删除
+                                    </button>
                                 </div>
                             </div>
                         </div>
                     </div>
                 </div>
+                <hr class="simple" color="#6f5499" />
+                <!-- 显示分组结束位置 -->
             </div>
 
             <!-- 连接管理 -->
             <div class="col-md-12">
                 <!-- 连接开始位置 -->
                 <div class="form-group">
-                    <div class="col-md-12">
+                    <div class="col-md-10">
                         <button type="button" class="btn btn-primary" id="indexAddConnectorBtn">
                             <span class="fa fa-plus"></span>添加连接([[${connectors?.size()} ?: 0]])
                         </button>
                     </div>
+                    <div class="col-md-2 text-right">
+                        <button type="button" class="btn btn-default" id="addProjectGroupBtn" th:if="${connectorSize gt 0}">
+                            <span class="fa fa-plus"></span>添加分组([[${projectGroups?.size()} ?: 0]])
+                        </button>
+                    </div>
                 </div>
                 <!-- 显示连接列表 -->
                 <div class="row" th:if="${connectors?.size() gt 0}">
@@ -96,7 +90,7 @@
             <!-- 驱动管理 -->
             <div class="col-md-12">
                 <!-- 驱动开始位置 -->
-                <div class="form-group" th:if="${connectors?.size() gt 0}">
+                <div class="form-group" th:if="${connectorSize gt 0}">
                     <div class="col-md-12">
                         <button type="button" class="btn btn-primary" id="indexAddMappingBtn">
                             <span class="fa fa-plus"></span>添加驱动([[${mappings?.size()} ?: 0]])
@@ -114,7 +108,7 @@
                                     <div class="col-md-4" th:each="m,state : ${mappings}">
                                         <div th:id="${m?.id}" th:class="${m?.meta?.model eq '全量'} ? 'jumbotron dbsyncer_block dbsyncer_block_full' : 'jumbotron dbsyncer_block dbsyncer_block_increment'">
                                             <!--驱动标题信息 -->
-                                            <div class="row text-center" th:text="${m?.name}" th:title="${m?.name}"></div>
+                                            <div class="row text-center dbsyncer_driver_name" th:text="${m?.name}" th:title="${m?.name}"></div>
 
                                             <div class="row">
                                                 <!--左边驱动信息 -->

+ 6 - 1
dbsyncer-web/src/main/resources/public/mapping/editFull.html

@@ -19,7 +19,12 @@
                     <input type="number" name="batchNum" class="form-control" min="1" dbsyncer-valid="require" th:value="${mapping?.batchNum}">
                 </div>
             </div>
-            <div class="col-md-4"></div>
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">线程数<strong class="driverVerifcateRequired">*</strong></label>
+                <div class="col-sm-9">
+                    <input type="number" name="threadNum" class="form-control" min="1" dbsyncer-valid="require" th:value="${mapping?.threadNum}">
+                </div>
+            </div>
         </div>
     </div>
 </div>

+ 1 - 1
dbsyncer-web/src/main/resources/public/monitor/monitor.html

@@ -84,7 +84,7 @@
             </div>
 
             <!-- 数据 -->
-            <div class="col-md-12">
+            <div class="col-md-12" th:if="${meta?.size() gt 0}">
                 <div class="panel-group">
                     <div class="panel panel-default">
                         <div class="panel-heading">

+ 5 - 26
dbsyncer-web/src/main/resources/static/css/common.css

@@ -1,36 +1,15 @@
 @CHARSET "UTF-8";
-
-.dbsyncer_img {
-    border-radius: 5%;
-}
-
-.dbsyncer_pointer {
-    cursor: pointer;
-}
+.dbsyncer_img {border-radius: 5%;}
+.dbsyncer_pointer {cursor: pointer;}
 .dbsyncer_over_hidden{overflow: hidden; text-overflow:ellipsis; white-space: nowrap;}
 .dbsyncer_block {border:1px solid #ffffff;}
 .dbsyncer_block:hover { cursor:pointer; background-color: #EBEBEB; -moz-box-shadow:2px 2px 5px; -webkit-box-shadow:2px 2px 5px; box-shadow:2px 2px 5px; -webkit-transform:translateY(-3px); transform: translateY(-3px); transition: all 0.3s ease-in-out;}
 .driverVerifcateRequired{color:#d9534f;}
 .dbsyncerVerifcateError{border-color: #b94a48;}
 .footerContainer{width:100%;position:relative;bottom:0;background-color:white;}
-/**
- * 强制单词换行
- */
-.driver_break_word {
-    word-break: break-all;
-    word-wrap: break-word;
-    white-space: normal
-}
-
-.driver_hidden_word {
-    white-space: nowrap;
-    overflow: hidden;
-    text-overflow: ellipsis
-}
-
-.fa_gray {
-    color: gray
-}
+.driver_break_word {word-break: break-all; word-wrap: break-word;white-space: normal}
+.driver_hidden_word {white-space: nowrap; overflow: hidden; text-overflow: ellipsis}
+.fa_gray {color: gray}
 .fa_blueviolet {color: blueviolet}
 .dbsyncer_btn-info { background-color: #E7EDF8;}
 .dbsyncer_btn-info.active, .dbsyncer_btn-info.focus, .dbsyncer_btn-info:active, .dbsyncer_btn-info:focus, .dbsyncer_btn-info:hover, .open>.dropdown-toggle.dbsyncer_btn-info {background-color: #D4DDED;}

+ 1 - 0
dbsyncer-web/src/main/resources/static/css/index/index.css

@@ -23,5 +23,6 @@
 .mappingList .dropdown-menu {right: 12px;left: auto; top: 28px;}
 .mappingList .dropdown-menu i{font-size: 20px;}
 
+.dbsyncer_driver_name { margin-bottom: 5px;}
 .dbsyncer_block_increment { background: -webkit-linear-gradient(left, #ffffff, #dffbdc);}
 .dbsyncer_block_full { background: -webkit-linear-gradient(left, #ffffff, #dcf5fb);}

+ 1 - 1
dbsyncer-web/src/main/resources/static/js/index/index.js

@@ -181,7 +181,7 @@ function doPost(url) {
 
 $(function () {
     // 初始化select插件
-    initSelectIndex($(".select-control"), 1);
+    initSelectIndex($(".select-control"));
     bindAddProjectGroup();
     var $projectGroupSelect = $("#projectGroup");
     bindEditProjectGroup($projectGroupSelect);

+ 8 - 9
dbsyncer-web/src/main/resources/static/js/monitor/index.js

@@ -1,13 +1,12 @@
 function formatDate(time) {
-   var date = new Date(time);
-  var YY = date.getFullYear() + '-';
-  var MM = (date.getMonth() + 1 < 10 ? '0' + (date.getMonth() + 1) : date.getMonth() + 1) + '-';
-  var DD = (date.getDate() < 10 ? '0' + (date.getDate()) : date.getDate());
-  var hh = (date.getHours() < 10 ? '0' + date.getHours() : date.getHours()) + ':';
-  var mm = (date.getMinutes() < 10 ? '0' + date.getMinutes() : date.getMinutes()) + ':';
-  var ss = (date.getSeconds() < 10 ? '0' + date.getSeconds() : date.getSeconds());
-
-  return YY + MM + DD +" "+hh + mm + ss;
+    var date = new Date(time);
+    var YY = date.getFullYear() + '-';
+    var MM = (date.getMonth() + 1 < 10 ? '0' + (date.getMonth() + 1) : date.getMonth() + 1) + '-';
+    var DD = (date.getDate() < 10 ? '0' + (date.getDate()) : date.getDate());
+    var hh = (date.getHours() < 10 ? '0' + date.getHours() : date.getHours()) + ':';
+    var mm = (date.getMinutes() < 10 ? '0' + date.getMinutes() : date.getMinutes()) + ':';
+    var ss = (date.getSeconds() < 10 ? '0' + date.getSeconds() : date.getSeconds());
+    return YY + MM + DD +" "+hh + mm + ss;
 }
 
 // 查看详细数据