|
@@ -30,7 +30,6 @@ import org.dbsyncer.parser.model.TableGroup;
|
|
|
import org.dbsyncer.parser.util.PickerUtil;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.Assert;
|
|
|
|
|
@@ -47,7 +46,6 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
@@ -81,10 +79,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
|
|
|
@Resource
|
|
|
private ConnectorFactory connectorFactory;
|
|
|
|
|
|
- @Qualifier("taskExecutor")
|
|
|
- @Resource
|
|
|
- private Executor taskExecutor;
|
|
|
-
|
|
|
private Map<String, Extractor> map = new ConcurrentHashMap<>();
|
|
|
|
|
|
@PostConstruct
|
|
@@ -127,7 +121,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
|
|
|
public void close(String metaId) {
|
|
|
Extractor extractor = map.get(metaId);
|
|
|
if (null != extractor) {
|
|
|
- extractor.clearAllListener();
|
|
|
extractor.close();
|
|
|
}
|
|
|
map.remove(metaId);
|
|
@@ -141,50 +134,56 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
|
|
|
map.forEach((k, v) -> v.flushEvent());
|
|
|
}
|
|
|
|
|
|
- private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
|
|
|
- throws InstantiationException, IllegalAccessException {
|
|
|
+ private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
|
|
|
AbstractConnectorConfig connectorConfig = connector.getConfig();
|
|
|
ListenerConfig listenerConfig = mapping.getListener();
|
|
|
|
|
|
// timing/log
|
|
|
final String listenerType = listenerConfig.getListenerType();
|
|
|
|
|
|
+ AbstractExtractor extractor = null;
|
|
|
// 默认定时抽取
|
|
|
if (ListenerTypeEnum.isTiming(listenerType)) {
|
|
|
- AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
|
|
|
- extractor.setCommands(list.stream().map(t -> {
|
|
|
+ AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
|
|
|
+ quartzExtractor.setCommands(list.stream().map(t -> {
|
|
|
String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
|
|
|
return new TableGroupCommand(pk, t.getCommand());
|
|
|
}).collect(Collectors.toList()));
|
|
|
- setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
|
|
|
- return extractor;
|
|
|
+ quartzExtractor.register(new QuartzListener(mapping, list));
|
|
|
+ extractor = quartzExtractor;
|
|
|
}
|
|
|
|
|
|
// 基于日志抽取
|
|
|
if (ListenerTypeEnum.isLog(listenerType)) {
|
|
|
- AbstractExtractor extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
|
|
|
+ extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
|
|
|
+ extractor.register(new LogListener(mapping, list, extractor));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (null != extractor) {
|
|
|
Set<String> filterTable = new HashSet<>();
|
|
|
- LogListener logListener = new LogListener(mapping, list, extractor);
|
|
|
- logListener.getTablePicker().forEach((k, fieldPickers) -> filterTable.add(k));
|
|
|
+ List<Table> sourceTable = new ArrayList<>();
|
|
|
+ list.forEach(t -> {
|
|
|
+ Table table = t.getSourceTable();
|
|
|
+ if (!filterTable.contains(t.getName())) {
|
|
|
+ sourceTable.add(table);
|
|
|
+ }
|
|
|
+ filterTable.add(table.getName());
|
|
|
+ });
|
|
|
+
|
|
|
+ extractor.setConnectorFactory(connectorFactory);
|
|
|
+ extractor.setScheduledTaskService(scheduledTaskService);
|
|
|
+ extractor.setConnectorConfig(connectorConfig);
|
|
|
+ extractor.setListenerConfig(listenerConfig);
|
|
|
extractor.setFilterTable(filterTable);
|
|
|
- setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), logListener);
|
|
|
+ extractor.setSourceTable(sourceTable);
|
|
|
+ extractor.setSnapshot(meta.getSnapshot());
|
|
|
+ extractor.setMetaId(meta.getId());
|
|
|
return extractor;
|
|
|
}
|
|
|
|
|
|
throw new ManagerException("未知的监听配置.");
|
|
|
}
|
|
|
|
|
|
- private void setExtractorConfig(AbstractExtractor extractor, AbstractConnectorConfig connector, ListenerConfig listener,
|
|
|
- Map<String, String> snapshot, AbstractListener event) {
|
|
|
- extractor.setConnectorFactory(connectorFactory);
|
|
|
- extractor.setScheduledTaskService(scheduledTaskService);
|
|
|
- extractor.setConnectorConfig(connector);
|
|
|
- extractor.setListenerConfig(listener);
|
|
|
- extractor.setSnapshot(snapshot);
|
|
|
- extractor.addListener(event);
|
|
|
- extractor.setMetaId(event.metaId);
|
|
|
- }
|
|
|
-
|
|
|
abstract class AbstractListener implements Event {
|
|
|
private static final int FLUSH_DELAYED_SECONDS = 30;
|
|
|
protected Mapping mapping;
|
|
@@ -245,11 +244,11 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
|
|
|
|
|
|
private List<FieldPicker> tablePicker;
|
|
|
|
|
|
- public QuartzListener(Mapping mapping, List<TableGroup> list) {
|
|
|
+ public QuartzListener(Mapping mapping, List<TableGroup> tableGroups) {
|
|
|
this.mapping = mapping;
|
|
|
this.metaId = mapping.getMetaId();
|
|
|
this.tablePicker = new LinkedList<>();
|
|
|
- list.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
|
|
|
+ tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -290,13 +289,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
|
|
|
private AtomicInteger eventCounter;
|
|
|
private static final int MAX_LOG_CACHE_SIZE = 128;
|
|
|
|
|
|
- public LogListener(Mapping mapping, List<TableGroup> list, Extractor extractor) {
|
|
|
+ public LogListener(Mapping mapping, List<TableGroup> tableGroups, Extractor extractor) {
|
|
|
this.mapping = mapping;
|
|
|
this.metaId = mapping.getMetaId();
|
|
|
this.extractor = extractor;
|
|
|
this.tablePicker = new LinkedHashMap<>();
|
|
|
this.eventCounter = new AtomicInteger();
|
|
|
- list.forEach(t -> {
|
|
|
+ tableGroups.forEach(t -> {
|
|
|
final Table table = t.getSourceTable();
|
|
|
final String tableName = table.getName();
|
|
|
List<Field> pkList = t.getTargetTable().getColumn().stream().filter(field -> field.isPk()).collect(Collectors.toList());
|
|
@@ -328,10 +327,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
|
|
|
eventCounter.set(0);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public Map<String, List<FieldPicker>> getTablePicker() {
|
|
|
- return tablePicker;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
}
|