|
@@ -5,6 +5,7 @@ import org.dbsyncer.biz.BizException;
|
|
|
import org.dbsyncer.biz.checker.AbstractChecker;
|
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
|
import org.dbsyncer.connector.config.Field;
|
|
|
+import org.dbsyncer.connector.config.Filter;
|
|
|
import org.dbsyncer.connector.config.MetaInfo;
|
|
|
import org.dbsyncer.connector.config.Table;
|
|
|
import org.dbsyncer.manager.Manager;
|
|
@@ -101,12 +102,15 @@ public class TableGroupChecker extends AbstractChecker {
|
|
|
|
|
|
public void setCommand(Mapping mapping, TableGroup tableGroup) {
|
|
|
TableGroup group = new TableGroup();
|
|
|
- group.setFieldMapping(tableGroup.getFieldMapping());
|
|
|
+ group.setFieldMapping(new ArrayList<>(tableGroup.getFieldMapping()));
|
|
|
group.setSourceTable(tableGroup.getSourceTable());
|
|
|
group.setTargetTable(tableGroup.getTargetTable());
|
|
|
// 默认使用全局的过滤条件
|
|
|
group.setFilter(CollectionUtils.isEmpty(tableGroup.getFilter()) ? mapping.getFilter() : tableGroup.getFilter());
|
|
|
|
|
|
+ // 添加增量配置事件和过滤条件字段
|
|
|
+ appendEventAndFilter(mapping, group);
|
|
|
+
|
|
|
Map<String, String> command = manager.getCommand(mapping, group);
|
|
|
tableGroup.setCommand(command);
|
|
|
|
|
@@ -136,6 +140,44 @@ public class TableGroupChecker extends AbstractChecker {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void appendEventAndFilter(Mapping mapping, TableGroup group) {
|
|
|
+ final List<FieldMapping> fieldMapping = group.getFieldMapping();
|
|
|
+
|
|
|
+ // 检查增量字段是否在映射关系中
|
|
|
+ String eventFieldName = mapping.getListener().getEventFieldName();
|
|
|
+ if (StringUtils.isNotBlank(eventFieldName)) {
|
|
|
+ Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
|
|
|
+ addFieldMapping(fieldMapping, eventFieldName, fields);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查过滤条件是否在映射关系中
|
|
|
+ List<Filter> filter = group.getFilter();
|
|
|
+ if (!CollectionUtils.isEmpty(filter)) {
|
|
|
+ Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
|
|
|
+ filter.forEach(f -> addFieldMapping(fieldMapping, f.getName(), fields));
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addFieldMapping(List<FieldMapping> fieldMapping, String name, Map<String, Field> fields) {
|
|
|
+ if (StringUtils.isNotBlank(name)) {
|
|
|
+ boolean exist = false;
|
|
|
+ for (FieldMapping m : fieldMapping) {
|
|
|
+ Field source = m.getSource();
|
|
|
+ if (null == source) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (StringUtils.equals(source.getName(), name)) {
|
|
|
+ exist = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!exist && null != fields.get(name)) {
|
|
|
+ fieldMapping.add(new FieldMapping(fields.get(name), null));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void mergeFieldMapping(TableGroup tableGroup) {
|
|
|
List<Field> sCol = tableGroup.getSourceTable().getColumn();
|
|
|
List<Field> tCol = tableGroup.getTargetTable().getColumn();
|
|
@@ -173,7 +215,7 @@ public class TableGroupChecker extends AbstractChecker {
|
|
|
* 解析映射关系
|
|
|
*
|
|
|
* @param tableGroup
|
|
|
- * @param json [{"source":"id","target":"id"}]
|
|
|
+ * @param json [{"source":"id","target":"id"}]
|
|
|
* @return
|
|
|
*/
|
|
|
private void setFieldMapping(TableGroup tableGroup, String json) {
|
|
@@ -194,7 +236,13 @@ public class TableGroupChecker extends AbstractChecker {
|
|
|
row = mapping.getJSONObject(i);
|
|
|
s = sMap.get(row.getString("source"));
|
|
|
t = tMap.get(row.getString("target"));
|
|
|
- t.setPk(row.getBoolean("pk"));
|
|
|
+ if (null == s && null == t) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (null != t) {
|
|
|
+ t.setPk(row.getBoolean("pk"));
|
|
|
+ }
|
|
|
list.add(new FieldMapping(s, t));
|
|
|
}
|
|
|
tableGroup.setFieldMapping(list);
|