AE86 1 tahun lalu
induk
melakukan
d15a1020cf

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/LogConfigChecker.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.biz.checker.impl.mapping;
 
 import org.dbsyncer.biz.checker.MappingConfigChecker;
-import org.dbsyncer.connector.config.ListenerConfig;
+import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.parser.model.Mapping;
 import org.springframework.stereotype.Component;

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -8,7 +8,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.ListenerConfig;
+import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.sdk.enums.ModelEnum;

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/TimingConfigChecker.java

@@ -2,7 +2,7 @@ package org.dbsyncer.biz.checker.impl.mapping;
 
 import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.ListenerConfig;
+import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.parser.model.Mapping;
 import org.springframework.stereotype.Component;

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractListener.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.connector;
 
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.connector.config.ListenerConfig;
 import org.dbsyncer.connector.scheduled.ScheduledTaskService;
+import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.Listener;
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractListener implements Listener {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
+    private final int FLUSH_DELAYED_SECONDS = 20;
     protected ConnectorFactory connectorFactory;
     protected ScheduledTaskService scheduledTaskService;
     protected ConnectorConfig connectorConfig;
@@ -37,7 +38,6 @@ public abstract class AbstractListener implements Listener {
     protected Map<String, String> snapshot;
     protected String metaId;
     private Watcher watcher;
-    private final int FLUSH_DELAYED_SECONDS = 20;
 
     @Override
     public void register(Watcher watcher) {

+ 3 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java

@@ -3,7 +3,6 @@ package org.dbsyncer.manager.impl;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.AbstractListener;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.config.ListenerConfig;
 import org.dbsyncer.connector.quartz.AbstractQuartzListener;
 import org.dbsyncer.connector.quartz.TableGroupQuartzCommand;
 import org.dbsyncer.connector.scheduled.ScheduledTaskJob;
@@ -21,6 +20,7 @@ import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.ChangedOffset;
@@ -152,13 +152,13 @@ public final class IncrementPuller extends AbstractPuller implements Application
         if (ListenerTypeEnum.isTiming(listenerType) && listener instanceof AbstractQuartzListener) {
             AbstractQuartzListener quartzListener = (AbstractQuartzListener) listener;
             quartzListener.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
-            quartzListener.register(new QuartzConsumer().init(bufferActuatorRouter, profileComponent, logService, meta, mapping, list));
+            quartzListener.register(new QuartzConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
         }
 
         // 基于日志抽取
         if (ListenerTypeEnum.isLog(listenerType) && listener instanceof AbstractListener) {
             AbstractListener abstractListener = (AbstractListener) listener;
-            abstractListener.register(new LogConsumer().init(bufferActuatorRouter, profileComponent, logService, meta, mapping, list));
+            abstractListener.register(new LogConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
         }
 
         if (listener instanceof AbstractListener) {

+ 13 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/AbstractConsumer.java

@@ -26,15 +26,15 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
     private BufferActuatorRouter bufferActuatorRouter;
     private ProfileComponent profileComponent;
     private LogService logService;
-    private Meta meta;
+    private String metaId;
     protected Mapping mapping;
     protected List<TableGroup> tableGroups;
 
-    public AbstractConsumer init(BufferActuatorRouter bufferActuatorRouter, ProfileComponent profileComponent, LogService logService, Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
+    public AbstractConsumer init(BufferActuatorRouter bufferActuatorRouter, ProfileComponent profileComponent, LogService logService, String metaId, Mapping mapping, List<TableGroup> tableGroups) {
         this.bufferActuatorRouter = bufferActuatorRouter;
         this.profileComponent = profileComponent;
         this.logService = logService;
-        this.meta = meta;
+        this.metaId = metaId;
         this.mapping = mapping;
         this.tableGroups = tableGroups;
         postProcessBeforeInitialization();
@@ -50,7 +50,7 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
 
     @Override
     public void changeEvent(ChangedEvent event) {
-        event.getChangedOffset().setMetaId(meta.getId());
+        event.getChangedOffset().setMetaId(metaId);
         if (event instanceof DDLChangedEvent) {
             onDDLChanged((DDLChangedEvent) event);
             return;
@@ -60,8 +60,11 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
 
     @Override
     public void flushEvent(Map<String, String> snapshot) {
-        meta.setSnapshot(snapshot);
-        profileComponent.editConfigModel(meta);
+        Meta meta = profileComponent.getMeta(metaId);
+        if (meta != null) {
+            meta.setSnapshot(snapshot);
+            profileComponent.editConfigModel(meta);
+        }
     }
 
     @Override
@@ -71,14 +74,15 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
 
     @Override
     public long getMetaUpdateTime() {
-        return meta.getUpdateTime();
+        Meta meta = profileComponent.getMeta(metaId);
+        return meta != null ? meta.getUpdateTime() : 0L;
     }
 
     protected void bind(String tableGroupId) {
-        bufferActuatorRouter.bind(meta.getId(), tableGroupId);
+        bufferActuatorRouter.bind(metaId, tableGroupId);
     }
 
     protected void execute(String tableGroupId, ChangedEvent event) {
-        bufferActuatorRouter.execute(meta.getId(), tableGroupId, event);
+        bufferActuatorRouter.execute(metaId, tableGroupId, event);
     }
 }

+ 15 - 13
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -4,7 +4,7 @@ import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.parser.ProfileComponent;
+import org.dbsyncer.parser.CacheService;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.springframework.util.Assert;
@@ -23,7 +23,7 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private FlushService flushService;
 
     @Resource
-    private ProfileComponent profileComponent;
+    private CacheService cacheService;
 
     @Resource
     private StorageConfig storageConfig;
@@ -38,27 +38,29 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         flush(metaId, result, event);
     }
 
-    protected void flush(String metaId, Result result, String event) {
+    protected void refreshTotal(String metaId, Result writer) {
+        Assert.hasText(metaId, "Meta id can not be empty.");
+        Meta meta = cacheService.get(metaId, Meta.class);
+        if (meta != null) {
+            meta.getFail().getAndAdd(writer.getFailData().size());
+            meta.getSuccess().getAndAdd(writer.getSuccessData().size());
+            meta.setUpdateTime(Instant.now().toEpochMilli());
+        }
+    }
+
+    private void flush(String metaId, Result result, String event) {
         refreshTotal(metaId, result);
 
+        // 是否写失败数据
         if (storageConfig.isWriteFail() && !CollectionUtils.isEmpty(result.getFailData())) {
             final String error = StringUtil.substring(result.getError().toString(), 0, storageConfig.getMaxErrorLength());
             flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
-        // 是否写增量数据
+        // 是否写成功数据
         if (storageConfig.isWriteSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
             flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }
 
-    protected void refreshTotal(String metaId, Result writer) {
-        Assert.hasText(metaId, "Meta id can not be empty.");
-        Meta meta = profileComponent.getMeta(metaId);
-        Assert.notNull(meta, "Meta can not be null.");
-        meta.getFail().getAndAdd(writer.getFailData().size());
-        meta.getSuccess().getAndAdd(writer.getSuccessData().size());
-        meta.setUpdateTime(Instant.now().toEpochMilli());
-    }
-
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.model;
 
-import org.dbsyncer.connector.config.ListenerConfig;
+import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.enums.ModelEnum;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.storage.constant.ConfigConstant;

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ListenerConfig.java → dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/ListenerConfig.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.connector.config;
+package org.dbsyncer.sdk.config;
 
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;