1
0
AE86 3 жил өмнө
parent
commit
9b390f068f

+ 3 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/FullPuller.java → dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -1,15 +1,14 @@
-package org.dbsyncer.manager.puller.impl;
+package org.dbsyncer.manager.puller;
 
-import org.dbsyncer.parser.event.FullRefreshEvent;
-import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;

+ 3 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java → dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.manager.puller.impl;
+package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.event.RowChangedEvent;
@@ -21,7 +21,6 @@ import org.dbsyncer.listener.quartz.AbstractQuartzExtractor;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.config.FieldPicker;
-import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
@@ -88,7 +87,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         key = UUIDUtil.getUUID();
         String cron = "*/10 * * * * ?";
         scheduledTaskService.start(key, cron, this);
-        logger.info("[{}], Started persistence task {}", cron, key);
+        logger.info("[{}], Started scheduled task", cron);
     }
 
     @Override
@@ -141,7 +140,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     @Override
     public void destroy() {
         scheduledTaskService.stop(key);
-        logger.info("Stopped persistence task {}", key);
+        logger.info("Stopped scheduled task.");
     }
 
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)

+ 35 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -1,7 +1,10 @@
 package org.dbsyncer.parser.flush;
 
 import com.alibaba.fastjson.JSONException;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.storage.SnowflakeIdWorker;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -9,11 +12,15 @@ import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.time.Instant;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -27,7 +34,7 @@ import java.util.stream.Collectors;
  * @date 2020/05/19 18:38
  */
 @Component
-public class FlushServiceImpl implements FlushService {
+public class FlushServiceImpl implements FlushService, ScheduledTaskJob, DisposableBean {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -37,6 +44,19 @@ public class FlushServiceImpl implements FlushService {
     @Autowired
     private SnowflakeIdWorker snowflakeIdWorker;
 
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    private String key;
+
+    @PostConstruct
+    private void init() {
+        key = UUIDUtil.getUUID();
+        String cron = "*/3 * * * * ?";
+        scheduledTaskService.start(key, cron, this);
+        logger.info("[{}], Started scheduled task", cron);
+    }
+
     @Override
     public void asyncWrite(String type, String error) {
         Map<String, Object> params = new HashMap();
@@ -69,4 +89,17 @@ public class FlushServiceImpl implements FlushService {
         }).collect(Collectors.toList());
         storageService.addData(StorageEnum.DATA, metaId, list);
     }
+
+
+    @Override
+    public void run() {
+        // TODO 批量写入同步数据
+        logger.info("run flush task");
+    }
+
+    @Override
+    public void destroy() {
+        scheduledTaskService.stop(key);
+        logger.info("Stopped scheduled task.");
+    }
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
  * @date 2021/11/18 22:21
  */
 @Component
-@ConditionalOnProperty(prefix = "dbsyncer.parser.flush", value = "enabled", havingValue = "true")
+@ConditionalOnProperty(value = "dbsyncer.parser.flush.enabled", havingValue = "true")
 public final class EnableFlushStrategy extends AbstractFlushStrategy {
 
 }

+ 4 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -192,12 +192,10 @@ public class Shard {
 
     private void execute(Object value, Callback callback) throws IOException {
         if (null != value) {
-            synchronized(indexWriter){
-                if (indexWriter.isOpen()) {
-                    callback.execute();
-                    indexWriter.commit();
-                    return;
-                }
+            if (indexWriter.isOpen()) {
+                callback.execute();
+                indexWriter.commit();
+                return;
             }
             logger.error(value.toString());
         }

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -49,7 +49,7 @@ import java.util.stream.Stream;
  * @date 2019/9/10 23:22
  */
 @Component
-@ConditionalOnProperty(value = "dbsyncer.storage.support.mysql", havingValue = "true")
+@ConditionalOnProperty(value = "dbsyncer.storage.support.mysql.enabled", havingValue = "true")
 @ConfigurationProperties(prefix = "dbsyncer.storage.support.mysql")
 public class MysqlStorageServiceImpl extends AbstractStorageService {
 

+ 2 - 2
dbsyncer-web/src/main/resources/application.properties

@@ -4,14 +4,14 @@ server.port=18686
 #web
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-dbsyncer.web.thread.pool.core.size=10
+dbsyncer.web.thread.pool.core.size=20
 dbsyncer.web.thread.pool.queue.capacity=2000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/
 
 #storage
 dbsyncer.storage.id=1
-#dbsyncer.storage.support.mysql=true
+#dbsyncer.storage.support.mysql.enabled=true
 #dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.password=123