|
@@ -2,17 +2,22 @@ package org.dbsyncer.manager.extractor.impl;
|
|
|
|
|
|
import org.dbsyncer.common.event.IncrementRefreshEvent;
|
|
import org.dbsyncer.common.event.IncrementRefreshEvent;
|
|
import org.dbsyncer.common.model.Task;
|
|
import org.dbsyncer.common.model.Task;
|
|
|
|
+import org.dbsyncer.common.util.StringUtil;
|
|
import org.dbsyncer.listener.Listener;
|
|
import org.dbsyncer.listener.Listener;
|
|
import org.dbsyncer.listener.config.ListenerConfig;
|
|
import org.dbsyncer.listener.config.ListenerConfig;
|
|
import org.dbsyncer.manager.Manager;
|
|
import org.dbsyncer.manager.Manager;
|
|
import org.dbsyncer.manager.enums.TaskEnum;
|
|
import org.dbsyncer.manager.enums.TaskEnum;
|
|
import org.dbsyncer.manager.extractor.AbstractExtractor;
|
|
import org.dbsyncer.manager.extractor.AbstractExtractor;
|
|
|
|
+import org.dbsyncer.manager.extractor.Increment;
|
|
import org.dbsyncer.parser.model.Connector;
|
|
import org.dbsyncer.parser.model.Connector;
|
|
import org.dbsyncer.parser.model.Mapping;
|
|
import org.dbsyncer.parser.model.Mapping;
|
|
import org.dbsyncer.parser.model.Meta;
|
|
import org.dbsyncer.parser.model.Meta;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
+import org.springframework.beans.BeansException;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
|
+import org.springframework.context.ApplicationContextAware;
|
|
import org.springframework.context.ApplicationListener;
|
|
import org.springframework.context.ApplicationListener;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.util.Assert;
|
|
import org.springframework.util.Assert;
|
|
@@ -28,7 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
* @date 2020/04/26 15:28
|
|
* @date 2020/04/26 15:28
|
|
*/
|
|
*/
|
|
@Component
|
|
@Component
|
|
-public class IncrementExtractor extends AbstractExtractor implements ApplicationListener<IncrementRefreshEvent> {
|
|
|
|
|
|
+public class IncrementExtractor extends AbstractExtractor implements ApplicationContextAware, ApplicationListener<IncrementRefreshEvent> {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
@@ -40,26 +45,30 @@ public class IncrementExtractor extends AbstractExtractor implements Application
|
|
|
|
|
|
private Map<String, Task> map = new ConcurrentHashMap<>();
|
|
private Map<String, Task> map = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
+ private Map<String, Increment> handle;
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
|
+ handle = applicationContext.getBeansOfType(Increment.class);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void asyncStart(Mapping mapping) {
|
|
public void asyncStart(Mapping mapping) {
|
|
ListenerConfig listenerConfig = mapping.getListener();
|
|
ListenerConfig listenerConfig = mapping.getListener();
|
|
Connector connector = manager.getConnector(mapping.getSourceConnectorId());
|
|
Connector connector = manager.getConnector(mapping.getSourceConnectorId());
|
|
Assert.notNull(connector, "连接器不能为空.");
|
|
Assert.notNull(connector, "连接器不能为空.");
|
|
// log/timing
|
|
// log/timing
|
|
- String type = listenerConfig.getListenerType();
|
|
|
|
- Task task = TaskEnum.getIncrementTask(type);
|
|
|
|
- Assert.notNull(task, "未知的增量同步方式.");
|
|
|
|
|
|
+ String type = StringUtil.toLowerCaseFirstOne(listenerConfig.getListenerType()).concat("Increment");
|
|
|
|
+ Increment increment = handle.get(type);
|
|
|
|
+ Assert.notNull(increment, "未知的增量同步方式.");
|
|
|
|
|
|
final String metaId = mapping.getMetaId();
|
|
final String metaId = mapping.getMetaId();
|
|
- task.setId(metaId);
|
|
|
|
- map.putIfAbsent(metaId, task);
|
|
|
|
|
|
+ map.putIfAbsent(metaId, new Task(metaId));
|
|
|
|
|
|
try {
|
|
try {
|
|
// 执行任务
|
|
// 执行任务
|
|
logger.info("启动任务:{}", metaId);
|
|
logger.info("启动任务:{}", metaId);
|
|
- Task t = map.get(metaId);
|
|
|
|
- listener.execute(t, listenerConfig, connector.getConfig());
|
|
|
|
-
|
|
|
|
|
|
+ // TODO increment
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
// TODO 记录错误日志
|
|
// TODO 记录错误日志
|
|
logger.error(e.getMessage());
|
|
logger.error(e.getMessage());
|