|
@@ -1,24 +1,23 @@
|
|
package org.dbsyncer.manager.puller.impl;
|
|
package org.dbsyncer.manager.puller.impl;
|
|
|
|
|
|
import org.dbsyncer.common.event.Event;
|
|
import org.dbsyncer.common.event.Event;
|
|
|
|
+import org.dbsyncer.common.util.CollectionUtils;
|
|
|
|
+import org.dbsyncer.connector.config.Table;
|
|
import org.dbsyncer.listener.DefaultExtractor;
|
|
import org.dbsyncer.listener.DefaultExtractor;
|
|
-import org.dbsyncer.listener.Extractor;
|
|
|
|
import org.dbsyncer.listener.Listener;
|
|
import org.dbsyncer.listener.Listener;
|
|
-import org.dbsyncer.listener.config.ListenerConfig;
|
|
|
|
import org.dbsyncer.manager.Manager;
|
|
import org.dbsyncer.manager.Manager;
|
|
|
|
+import org.dbsyncer.manager.config.FieldPicker;
|
|
import org.dbsyncer.manager.puller.AbstractPuller;
|
|
import org.dbsyncer.manager.puller.AbstractPuller;
|
|
-import org.dbsyncer.manager.puller.Increment;
|
|
|
|
import org.dbsyncer.parser.Parser;
|
|
import org.dbsyncer.parser.Parser;
|
|
-import org.dbsyncer.parser.model.Connector;
|
|
|
|
-import org.dbsyncer.parser.model.Mapping;
|
|
|
|
-import org.dbsyncer.parser.model.Meta;
|
|
|
|
-import org.dbsyncer.parser.model.TableGroup;
|
|
|
|
|
|
+import org.dbsyncer.parser.model.*;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.util.Assert;
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.LinkedHashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -54,11 +53,15 @@ public class IncrementPuller extends AbstractPuller {
|
|
Connector connector = manager.getConnector(mapping.getSourceConnectorId());
|
|
Connector connector = manager.getConnector(mapping.getSourceConnectorId());
|
|
Assert.notNull(connector, "连接器不能为空.");
|
|
Assert.notNull(connector, "连接器不能为空.");
|
|
List<TableGroup> list = manager.getTableGroupAll(mappingId);
|
|
List<TableGroup> list = manager.getTableGroupAll(mappingId);
|
|
- Assert.notEmpty(list, "映射关系不能为空");
|
|
|
|
|
|
+ Assert.notEmpty(list, "映射关系不能为空.");
|
|
Meta meta = manager.getMeta(metaId);
|
|
Meta meta = manager.getMeta(metaId);
|
|
Assert.notNull(meta, "Meta不能为空.");
|
|
Assert.notNull(meta, "Meta不能为空.");
|
|
DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), mapping.getListener(), meta.getMap());
|
|
DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), mapping.getListener(), meta.getMap());
|
|
Assert.notNull(extractor, "未知的监听配置.");
|
|
Assert.notNull(extractor, "未知的监听配置.");
|
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
|
+ meta.setBeginTime(now);
|
|
|
|
+ meta.setEndTime(now);
|
|
|
|
+ manager.editMeta(meta);
|
|
|
|
|
|
// 监听数据变更事件
|
|
// 监听数据变更事件
|
|
extractor.addListener(new DefaultListener(mapping, list));
|
|
extractor.addListener(new DefaultListener(mapping, list));
|
|
@@ -75,23 +78,12 @@ public class IncrementPuller extends AbstractPuller {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void close(String metaId) {
|
|
public void close(String metaId) {
|
|
- Extractor extractor = map.get(metaId);
|
|
|
|
|
|
+ DefaultExtractor extractor = map.get(metaId);
|
|
if (null != extractor) {
|
|
if (null != extractor) {
|
|
|
|
+ extractor.clearAllListener();
|
|
extractor.close();
|
|
extractor.close();
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * TODO 更新待优化,存在性能问题
|
|
|
|
- *
|
|
|
|
- * @param metaId
|
|
|
|
- */
|
|
|
|
- private void flush(String metaId) {
|
|
|
|
- Meta meta = manager.getMeta(metaId);
|
|
|
|
- DefaultExtractor extractor = map.get(metaId);
|
|
|
|
- if (null != meta && null != extractor) {
|
|
|
|
- meta.setMap(extractor.getMap());
|
|
|
|
- manager.editMeta(meta);
|
|
|
|
|
|
+ finished(metaId);
|
|
|
|
+ logger.info("关闭成功:{}", metaId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -104,23 +96,49 @@ public class IncrementPuller extends AbstractPuller {
|
|
|
|
|
|
private Mapping mapping;
|
|
private Mapping mapping;
|
|
private List<TableGroup> list;
|
|
private List<TableGroup> list;
|
|
|
|
+ private String metaId;
|
|
|
|
+ private Map<String, List<FieldPicker>> tablePicker;
|
|
|
|
|
|
public DefaultListener(Mapping mapping, List<TableGroup> list) {
|
|
public DefaultListener(Mapping mapping, List<TableGroup> list) {
|
|
this.mapping = mapping;
|
|
this.mapping = mapping;
|
|
this.list = list;
|
|
this.list = list;
|
|
|
|
+ this.metaId = mapping.getMetaId();
|
|
|
|
+ this.tablePicker = new LinkedHashMap<>();
|
|
|
|
+ list.forEach(t -> {
|
|
|
|
+ final Table table = t.getSourceTable();
|
|
|
|
+ final String tableName = table.getName();
|
|
|
|
+ tablePicker.putIfAbsent(tableName, new ArrayList<>());
|
|
|
|
+ tablePicker.get(tableName).add(new FieldPicker(t, table.getColumn(), t.getFieldMapping()));
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
|
|
public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
|
|
- logger.info("监听数据>tableName:{},event:{},after:{}, after:{}", tableName, event, before, after);
|
|
|
|
|
|
+ logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
|
|
|
|
+
|
|
// 处理过程有异常向上抛
|
|
// 处理过程有异常向上抛
|
|
- list.forEach(tableGroup -> parser.execute(mapping, tableGroup));
|
|
|
|
|
|
+ List<FieldPicker> pickers = tablePicker.get(tableName);
|
|
|
|
+ if (!CollectionUtils.isEmpty(pickers)) {
|
|
|
|
+ pickers.parallelStream().forEach(p -> {
|
|
|
|
+ DataEvent data = new DataEvent(event, p.getColumns(before), p.getColumns(after));
|
|
|
|
+ parser.execute(mapping, p.getTableGroup(), data);
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void flushEvent() {
|
|
public void flushEvent() {
|
|
- logger.info("flushEvent");
|
|
|
|
- flush(mapping.getMetaId());
|
|
|
|
|
|
+ // TODO 更新待优化,存在性能问题
|
|
|
|
+ DefaultExtractor extractor = map.get(metaId);
|
|
|
|
+ if (null != extractor) {
|
|
|
|
+ logger.info("flushEvent map:{}", extractor.getMap());
|
|
|
|
+ Meta meta = manager.getMeta(metaId);
|
|
|
|
+ if (null != meta) {
|
|
|
|
+ meta.setMap(extractor.getMap());
|
|
|
|
+ manager.editMeta(meta);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|