瀏覽代碼

优化逻辑

穿云 4 月之前
父節點
當前提交
f8173ee8e9

+ 9 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/TableGroupService.java

@@ -3,6 +3,7 @@
  */
 package org.dbsyncer.biz;
 
+import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 
 import java.util.List;
@@ -60,4 +61,12 @@ public interface TableGroupService {
      */
     List<TableGroup> getTableGroupAll(String mappingId);
 
+    /**
+     * 更新元信息
+     *
+     * @param mapping
+     * @param metaSnapshot
+     */
+    void updateMeta(Mapping mapping, String metaSnapshot);
+
 }

+ 2 - 94
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -6,15 +6,12 @@ package org.dbsyncer.biz.checker.impl.mapping;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
-import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
@@ -25,12 +22,8 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
-import java.time.Instant;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
 /**
  * @author AE86
@@ -113,9 +106,8 @@ public class MappingChecker extends AbstractChecker {
         // 修改高级配置:过滤条件/转换配置/插件配置
         this.modifySuperConfigModel(mapping, params);
 
-        // 更新meta
-        String metaSnapshot = params.get("metaSnapshot");
-        updateMeta(mapping, metaSnapshot);
+        // 合并关联的映射关系配置
+        tableGroupChecker.batchMergeConfig(mapping, params);
 
         return mapping;
     }
@@ -131,48 +123,6 @@ public class MappingChecker extends AbstractChecker {
         mapping.setMetaId(id);
     }
 
-    /**
-     * 更新元信息
-     *
-     * @param mapping
-     */
-    public void updateMeta(Mapping mapping) {
-        updateMeta(mapping, null);
-    }
-
-    /**
-     * 合并关联的映射关系配置
-     *
-     * @param mapping
-     * @param params
-     */
-    public void batchMergeTableGroupConfig(Mapping mapping, Map<String, String> params) {
-        List<TableGroup> groupAll = profileComponent.getTableGroupAll(mapping.getId());
-        if (!CollectionUtils.isEmpty(groupAll)) {
-            // 手动排序
-            String[] sortedTableGroupIds = StringUtil.split(params.get("sortedTableGroupIds"), StringUtil.VERTICAL_LINE);
-            if (null != sortedTableGroupIds && sortedTableGroupIds.length > 0) {
-                Map<String, TableGroup> tableGroupMap = groupAll.stream().collect(Collectors.toMap(TableGroup::getId, f -> f, (k1, k2) -> k1));
-                groupAll.clear();
-                int size = sortedTableGroupIds.length;
-                int i = size;
-                while (i > 0) {
-                    TableGroup g = tableGroupMap.get(sortedTableGroupIds[size - i]);
-                    Assert.notNull(g, "Invalid sorted tableGroup.");
-                    g.setIndex(i);
-                    groupAll.add(g);
-                    i--;
-                }
-            }
-
-            // 合并配置
-            for (TableGroup g : groupAll) {
-                tableGroupChecker.mergeConfig(mapping, g);
-                profileComponent.editConfigModel(g);
-            }
-        }
-    }
-
     /**
      * 修改监听器配置
      *
@@ -188,46 +138,4 @@ public class MappingChecker extends AbstractChecker {
         listener.setEnableDDL(StringUtil.isNotBlank(params.get("enableDDL")));
     }
 
-    /**
-     * 更新元信息
-     *
-     * @param mapping
-     * @param metaSnapshot
-     */
-    private void updateMeta(Mapping mapping, String metaSnapshot) {
-        Meta meta = profileComponent.getMeta(mapping.getMetaId());
-        Assert.notNull(meta, "驱动meta不存在.");
-
-        // 清空状态
-        meta.clear();
-
-        // 手动配置增量点
-        if (StringUtil.isNotBlank(metaSnapshot)) {
-            Map snapshot = JsonUtil.jsonToObj(metaSnapshot, HashMap.class);
-            if (!CollectionUtils.isEmpty(snapshot)) {
-                meta.setSnapshot(snapshot);
-            }
-        }
-
-        getMetaTotal(meta, mapping.getModel());
-
-        meta.setUpdateTime(Instant.now().toEpochMilli());
-        profileComponent.editConfigModel(meta);
-    }
-
-    private void getMetaTotal(Meta meta, String model) {
-        // 全量同步
-        if (ModelEnum.isFull(model)) {
-            // 统计tableGroup总条数
-            AtomicLong count = new AtomicLong(0);
-            List<TableGroup> groupAll = profileComponent.getTableGroupAll(meta.getMappingId());
-            if (!CollectionUtils.isEmpty(groupAll)) {
-                for (TableGroup g : groupAll) {
-                    count.getAndAdd(g.getSourceTable().getCount());
-                }
-            }
-            meta.setTotal(count);
-        }
-    }
-
 }

+ 34 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -27,13 +27,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -162,6 +156,39 @@ public class TableGroupChecker extends AbstractChecker {
         return new Table(tableName, metaInfo.getTableType(), metaInfo.getColumn(), metaInfo.getSql(), metaInfo.getIndexType());
     }
 
+    /**
+     * 合并表配置
+     *
+     * @param mapping
+     * @param params
+     */
+    public void batchMergeConfig(Mapping mapping, Map<String, String> params) {
+        List<TableGroup> groupAll = profileComponent.getTableGroupAll(mapping.getId());
+        if (!CollectionUtils.isEmpty(groupAll)) {
+            // 手动排序
+            String[] sortedTableGroupIds = StringUtil.split(params.get("sortedTableGroupIds"), StringUtil.VERTICAL_LINE);
+            if (null != sortedTableGroupIds && sortedTableGroupIds.length > 0) {
+                Map<String, TableGroup> tableGroupMap = groupAll.stream().collect(Collectors.toMap(TableGroup::getId, f -> f, (k1, k2) -> k1));
+                groupAll.clear();
+                int size = sortedTableGroupIds.length;
+                int i = size;
+                while (i > 0) {
+                    TableGroup g = tableGroupMap.get(sortedTableGroupIds[size - i]);
+                    Assert.notNull(g, "Invalid sorted tableGroup.");
+                    g.setIndex(i);
+                    groupAll.add(g);
+                    i--;
+                }
+            }
+
+            // 合并配置
+            for (TableGroup g : groupAll) {
+                mergeConfig(mapping, g);
+                profileComponent.editConfigModel(g);
+            }
+        }
+    }
+
     private void checkRepeatedTable(String mappingId, String sourceTable, String targetTable) {
         List<TableGroup> list = profileComponent.getTableGroupAll(mappingId);
         if (!CollectionUtils.isEmpty(list)) {

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -44,7 +44,7 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private Map<String, Boolean> health = new LinkedHashMap<>();
+    private final Map<String, Boolean> health = new LinkedHashMap<>();
 
     @Resource
     private ProfileComponent profileComponent;

+ 5 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -88,8 +88,7 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
         String id = profileComponent.addConfigModel(model);
 
         // 匹配相似表 on
-        String autoMatchTable = params.get("autoMatchTable");
-        if (StringUtil.isNotBlank(autoMatchTable)) {
+        if (StringUtil.isNotBlank(params.get("autoMatchTable"))) {
             matchSimilarTable(model);
         }
 
@@ -135,7 +134,8 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
             Mapping model = (Mapping) mappingChecker.checkEditConfigModel(params);
             log(LogType.MappingLog.UPDATE, model);
 
-            mappingChecker.batchMergeTableGroupConfig(model, params);
+            // 更新meta
+            tableGroupService.updateMeta(mapping, params.get("metaSnapshot"));
             return profileComponent.editConfigModel(model);
         }
     }
@@ -206,12 +206,11 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
 
     @Override
     public List<MappingVo> getMappingAll() {
-        List<MappingVo> list = profileComponent.getMappingAll()
+        return profileComponent.getMappingAll()
                 .stream()
-                .map(m -> convertMapping2Vo(m))
+                .map(this::convertMapping2Vo)
                 .sorted(Comparator.comparing(MappingVo::getUpdateTime).reversed())
                 .collect(Collectors.toList());
-        return list;
     }
 
     @Override
@@ -337,7 +336,6 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
                 }
             }
         }
-        mappingChecker.updateMeta(mapping);
     }
 
     private void clearMetaIfFinished(String metaId) {

+ 61 - 22
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/TableGroupServiceImpl.java

@@ -4,25 +4,25 @@
 package org.dbsyncer.biz.impl;
 
 import org.dbsyncer.biz.TableGroupService;
-import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.sdk.constant.ConfigConstant;
+import org.dbsyncer.sdk.enums.ModelEnum;
 import org.dbsyncer.sdk.model.Field;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 /**
@@ -34,7 +34,7 @@ import java.util.stream.Stream;
 public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroupService {
 
     @Resource
-    private Checker tableGroupChecker;
+    private TableGroupChecker tableGroupChecker;
 
     @Resource
     private ProfileComponent profileComponent;
@@ -42,7 +42,8 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
     @Override
     public String add(Map<String, String> params) {
         String mappingId = params.get("mappingId");
-        assertRunning(profileComponent.getMapping(mappingId));
+        Mapping mapping = profileComponent.getMapping(mappingId);
+        assertRunning(mapping);
 
         synchronized (LOCK) {
             // table1, table2
@@ -63,7 +64,9 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
             }
 
             // 合并驱动公共字段
-            mergeMappingColumn(mappingId);
+            mergeMappingColumn(mapping);
+            // 更新meta
+            updateMeta(mapping, null);
             return 1 < tableSize ? String.valueOf(tableSize) : id;
         }
     }
@@ -73,12 +76,15 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
         String id = params.get(ConfigConstant.CONFIG_MODEL_ID);
         TableGroup tableGroup = profileComponent.getTableGroup(id);
         Assert.notNull(tableGroup, "Can not find tableGroup.");
-        assertRunning(profileComponent.getMapping(tableGroup.getMappingId()));
+        Mapping mapping = profileComponent.getMapping(tableGroup.getMappingId());
+        assertRunning(mapping);
 
         TableGroup model = (TableGroup) tableGroupChecker.checkEditConfigModel(params);
         log(LogType.TableGroupLog.UPDATE, model);
-
-        return profileComponent.editTableGroup(model);
+        profileComponent.editTableGroup(model);
+        // 更新meta
+        updateMeta(mapping, null);
+        return id;
     }
 
     @Override
@@ -87,9 +93,7 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
         Assert.notNull(tableGroup, "Can not find tableGroup.");
         assertRunning(profileComponent.getMapping(tableGroup.getMappingId()));
 
-        TableGroupChecker checker = (TableGroupChecker) tableGroupChecker;
-        checker.refreshTableFields(tableGroup);
-
+        tableGroupChecker.refreshTableFields(tableGroup);
         return profileComponent.editTableGroup(tableGroup);
     }
 
@@ -97,7 +101,8 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
     public boolean remove(String mappingId, String ids) {
         Assert.hasText(mappingId, "Mapping id can not be null");
         Assert.hasText(ids, "TableGroup ids can not be null");
-        assertRunning(profileComponent.getMapping(mappingId));
+        Mapping mapping = profileComponent.getMapping(mappingId);
+        assertRunning(mapping);
 
         // 批量删除表
         Stream.of(StringUtil.split(ids, ",")).parallel().forEach(id -> {
@@ -107,7 +112,7 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
         });
 
         // 合并驱动公共字段
-        mergeMappingColumn(mappingId);
+        mergeMappingColumn(mapping);
 
         // 重置排序
         resetTableGroupAllIndex(mappingId);
@@ -126,6 +131,43 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
         return profileComponent.getSortedTableGroupAll(mappingId);
     }
 
+    @Override
+    public void updateMeta(Mapping mapping, String metaSnapshot) {
+        Meta meta = profileComponent.getMeta(mapping.getMetaId());
+        Assert.notNull(meta, "驱动meta不存在.");
+
+        // 清空状态
+        meta.clear();
+
+        // 手动配置增量点
+        if (StringUtil.isNotBlank(metaSnapshot)) {
+            Map snapshot = JsonUtil.jsonToObj(metaSnapshot, HashMap.class);
+            if (!CollectionUtils.isEmpty(snapshot)) {
+                meta.setSnapshot(snapshot);
+            }
+        }
+
+        getMetaTotal(meta, mapping.getModel());
+
+        meta.setUpdateTime(Instant.now().toEpochMilli());
+        profileComponent.editConfigModel(meta);
+    }
+
+    private void getMetaTotal(Meta meta, String model) {
+        // 全量同步
+        if (ModelEnum.isFull(model)) {
+            // 统计tableGroup总条数
+            AtomicLong count = new AtomicLong(0);
+            List<TableGroup> groupAll = profileComponent.getTableGroupAll(meta.getMappingId());
+            if (!CollectionUtils.isEmpty(groupAll)) {
+                for (TableGroup g : groupAll) {
+                    count.getAndAdd(g.getSourceTable().getCount());
+                }
+            }
+            meta.setTotal(count);
+        }
+    }
+
     private void resetTableGroupAllIndex(String mappingId) {
         synchronized (LOCK) {
             List<TableGroup> list = profileComponent.getSortedTableGroupAll(mappingId);
@@ -140,11 +182,8 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
         }
     }
 
-    private void mergeMappingColumn(String mappingId) {
-        List<TableGroup> groups = profileComponent.getTableGroupAll(mappingId);
-
-        Mapping mapping = profileComponent.getMapping(mappingId);
-        Assert.notNull(mapping, "mapping not exist.");
+    private void mergeMappingColumn(Mapping mapping) {
+        List<TableGroup> groups = profileComponent.getTableGroupAll(mapping.getId());
 
         List<Field> sourceColumn = null;
         List<Field> targetColumn = null;

+ 1 - 17
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/MySQLSchemaResolver.java

@@ -5,7 +5,6 @@ package org.dbsyncer.connector.mysql.schema;
 
 import org.dbsyncer.connector.mysql.MySQLException;
 import org.dbsyncer.connector.mysql.schema.support.*;
-import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.AbstractSchemaResolver;
 import org.dbsyncer.sdk.schema.DataType;
 
@@ -22,14 +21,10 @@ import java.util.stream.Stream;
  */
 public final class MySQLSchemaResolver extends AbstractSchemaResolver {
 
-    private MySQLBytesType bytesType;
-
     @Override
     protected void initDataTypeMapping(Map<String, DataType> mapping) {
-        bytesType = new MySQLBytesType();
         Stream.of(
-                new MySQLBooleanType(),
-                bytesType,
+                new MySQLBytesType(),
                 new MySQLByteType(),
                 new MySQLDateType(),
                 new MySQLDecimalType(),
@@ -49,15 +44,4 @@ public final class MySQLSchemaResolver extends AbstractSchemaResolver {
         }));
     }
 
-    @Override
-    protected DataType getDataType(Map<String, DataType> mapping, Field field) {
-        DataType dataType = super.getDataType(mapping, field);
-        // bit(n > 1)
-        if (dataType instanceof MySQLBooleanType) {
-            if (field.getColumnSize() > 1) {
-                return bytesType;
-            }
-        }
-        return dataType;
-    }
 }

+ 0 - 51
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLBooleanType.java

@@ -1,51 +0,0 @@
-/**
- * DBSyncer Copyright 2020-2024 All Rights Reserved.
- */
-package org.dbsyncer.connector.mysql.schema.support;
-
-import org.dbsyncer.sdk.model.Field;
-import org.dbsyncer.sdk.schema.support.BooleanType;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * @Author 穿云
- * @Version 1.0.0
- * @Date 2024-11-26 22:59
- */
-public final class MySQLBooleanType extends BooleanType {
-
-    private enum TypeEnum {
-        BIT
-    }
-
-    @Override
-    public Set<String> getSupportedTypeName() {
-        return Arrays.stream(TypeEnum.values()).map(Enum::name).collect(Collectors.toSet());
-    }
-
-    @Override
-    protected Boolean merge(Object val, Field field) {
-        if (val instanceof Number) {
-            return ((Number) val).shortValue() == 1;
-        }
-        if (val instanceof BitSet) {
-            BitSet bitSet = (BitSet) val;
-            return bitSet.get(0);
-        }
-        return throwUnsupportedException(val, field);
-    }
-
-    @Override
-    protected Object convert(Object val, Field field) {
-        if (val instanceof Boolean) {
-            Boolean b = (Boolean) val;
-            return (short) (b ? 1 : 0);
-        }
-        return throwUnsupportedException(val, field);
-    }
-
-}

+ 6 - 0
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLByteType.java

@@ -7,6 +7,7 @@ import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.ByteType;
 
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -18,6 +19,7 @@ import java.util.stream.Collectors;
 public final class MySQLByteType extends ByteType {
 
     private enum TypeEnum {
+        BIT,
         TINYINT
     }
 
@@ -31,6 +33,10 @@ public final class MySQLByteType extends ByteType {
         if (val instanceof Number) {
             return ((Number) val).byteValue();
         }
+        if (val instanceof BitSet) {
+            BitSet bitSet = (BitSet) val;
+            return (byte) (bitSet.get(0) ? 1 : 0);
+        }
         return throwUnsupportedException(val, field);
     }