1
0

FieldPicker.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package org.dbsyncer.manager.config;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.dbsyncer.common.util.CollectionUtils;
  4. import org.dbsyncer.connector.CompareFilter;
  5. import org.dbsyncer.connector.config.Field;
  6. import org.dbsyncer.connector.config.Filter;
  7. import org.dbsyncer.connector.enums.FilterEnum;
  8. import org.dbsyncer.connector.enums.OperationEnum;
  9. import org.dbsyncer.parser.model.FieldMapping;
  10. import org.dbsyncer.parser.model.TableGroup;
  11. import org.springframework.util.Assert;
  12. import java.util.*;
  13. import java.util.stream.Collectors;
  14. public class FieldPicker {
  15. private TableGroup tableGroup;
  16. private List<Field> pkList;
  17. private List<Node> index;
  18. private int indexSize;
  19. private boolean filterSwitch;
  20. private List<Filter> add;
  21. private List<Filter> or;
  22. public FieldPicker(TableGroup tableGroup) {
  23. this.tableGroup = tableGroup;
  24. }
  25. public FieldPicker(TableGroup tableGroup, List<Field> pkList, List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
  26. this.tableGroup = tableGroup;
  27. this.pkList = pkList;
  28. init(filter, column, fieldMapping);
  29. }
  30. public Map<String, Object> getColumns(List<Object> list) {
  31. if (!CollectionUtils.isEmpty(list)) {
  32. Map<String, Object> data = new HashMap<>(indexSize);
  33. final int size = list.size() - 1;
  34. index.parallelStream().forEach(node -> {
  35. if (node.i <= size) {
  36. data.put(node.name, list.get(node.i));
  37. }
  38. });
  39. return data;
  40. }
  41. return Collections.EMPTY_MAP;
  42. }
  43. /**
  44. * 根据过滤条件过滤
  45. *
  46. * @param row
  47. * @return
  48. */
  49. public boolean filter(Map<String, Object> row) {
  50. if (!filterSwitch) {
  51. return true;
  52. }
  53. // where (id > 1 and id < 100) or (id = 100 or id =101)
  54. // 或 关系(成立任意条件)
  55. CompareFilter filter = null;
  56. Object value = null;
  57. for (Filter f : or) {
  58. value = row.get(f.getName());
  59. if (null == value) {
  60. continue;
  61. }
  62. filter = FilterEnum.getCompareFilter(f.getFilter());
  63. if (filter.compare(String.valueOf(value), f.getValue())) {
  64. return true;
  65. }
  66. }
  67. boolean pass = false;
  68. // 并 关系(成立所有条件)
  69. for (Filter f : add) {
  70. value = row.get(f.getName());
  71. if (null == value) {
  72. continue;
  73. }
  74. filter = FilterEnum.getCompareFilter(f.getFilter());
  75. if (!filter.compare(String.valueOf(value), f.getValue())) {
  76. return false;
  77. }
  78. pass = true;
  79. }
  80. return pass;
  81. }
  82. private void init(List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
  83. // 解析过滤条件
  84. if ((filterSwitch = !CollectionUtils.isEmpty(filter))) {
  85. add = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.AND.getName())).collect(
  86. Collectors.toList());
  87. or = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.OR.getName())).collect(Collectors.toList());
  88. }
  89. // column => [1, 86, 0, 中文, 2020-05-15T12:17:22.000+0800, 备注信息]
  90. Assert.notEmpty(column, "读取字段不能为空.");
  91. Assert.notEmpty(fieldMapping, "映射关系不能为空.");
  92. // 找到同步字段 => [{source.name}]
  93. Set<String> key = fieldMapping.stream().filter(m -> null != m.getSource()).map(m -> m.getSource().getName()).collect(Collectors.toSet());
  94. // 记录字段索引 [{"ID":0},{"NAME":1}]
  95. index = new LinkedList<>();
  96. int size = column.size();
  97. String k = null;
  98. for (int i = 0; i < size; i++) {
  99. k = column.get(i).getName();
  100. if (key.contains(k)) {
  101. index.add(new Node(k, i));
  102. }
  103. }
  104. Assert.notEmpty(index, "同步映射关系不能为空.");
  105. this.indexSize = index.size();
  106. }
  107. public TableGroup getTableGroup() {
  108. return tableGroup;
  109. }
  110. public String getPk() {
  111. return pkList.get(0).getName();
  112. }
  113. final class Node {
  114. // 属性
  115. String name;
  116. // 索引
  117. int i;
  118. public Node(String name, int i) {
  119. this.name = name;
  120. this.i = i;
  121. }
  122. }
  123. }