Browse Source

优化逻辑

穿云 3 months ago
parent
commit
08fc5bf93a

+ 0 - 1
dbsyncer-connector/dbsyncer-connector-base/pom.xml

@@ -68,7 +68,6 @@
             <version>${project.parent.version}</version>
         </dependency>
 
-
         <!-- dbsyncer-connector-sqlserver -->
         <dependency>
             <groupId>org.ghi</groupId>

+ 10 - 10
dbsyncer-connector/dbsyncer-connector-base/src/main/java/org/dbsyncer/connector/base/ConnectorFactory.java

@@ -184,17 +184,17 @@ public class ConnectorFactory implements DisposableBean {
     }
 
     public Result writer(PluginContext context) {
-        ConnectorInstance connectorInstance = context.getTargetConnectorInstance();
-        Assert.notNull(connectorInstance, "ConnectorInstance can not null");
-        ConnectorService connector = getConnectorService(connectorInstance.getConfig());
-        if (connector instanceof AbstractConnector) {
-            AbstractConnector conn = (AbstractConnector) connector;
+        ConnectorInstance targetInstance = context.getTargetConnectorInstance();
+        Assert.notNull(targetInstance, "targetConnectorInstance can not null");
+        ConnectorService targetConnector = getConnectorService(targetInstance.getConfig());
+        if (targetConnector instanceof AbstractConnector) {
+            AbstractConnector conn = (AbstractConnector) targetConnector;
             try {
-                SchemaResolver schemaResolver = connector.getSchemaResolver();
-                if (context.isEnableSchemaResolver() && schemaResolver != null) {
-                    conn.convertProcessBeforeWriter(schemaResolver, context);
+                // 支持标准解析器
+                if (context.isEnableSchemaResolver() && targetConnector.getSchemaResolver() != null) {
+                    conn.convertProcessBeforeWriter(context, targetConnector.getSchemaResolver());
                 } else {
-                    conn.convertProcessBeforeWriter(connectorInstance, context);
+                    conn.convertProcessBeforeWriter(context, targetInstance);
                 }
             } catch (Exception e) {
                 Result result = new Result();
@@ -204,7 +204,7 @@ public class ConnectorFactory implements DisposableBean {
             }
         }
 
-        Result result = connector.writer(connectorInstance, context);
+        Result result = targetConnector.writer(targetInstance, context);
         Assert.notNull(result, "Connector writer batch result can not null");
         return result;
     }

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -261,7 +261,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
             throw new ElasticsearchException("writer data can not be empty.");
         }
 
-        final Result result = new Result();
+        Result result = new Result();
         final List<Field> pkFields = PrimaryKeyUtil.findExistPrimaryKeyFields(context.getTargetFields());
         try {
             final BulkRequest request = new BulkRequest();

+ 0 - 0
dbsyncer-connector/dbsyncer-connector-sqlite/src/main/resources/static/img/sqlite.png → dbsyncer-connector/dbsyncer-connector-sqlite/src/main/resources/static/img/SQLite.png


+ 19 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -16,7 +16,14 @@ import org.dbsyncer.parser.TableGroupContext;
 import org.dbsyncer.parser.ddl.DDLParser;
 import org.dbsyncer.parser.event.RefreshOffsetEvent;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.TableGroupPicker;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.parser.model.WriterResponse;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.plugin.PluginFactory;
@@ -28,6 +35,8 @@ import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.MetaInfo;
+import org.dbsyncer.sdk.schema.SchemaResolver;
+import org.dbsyncer.sdk.spi.ConnectorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
@@ -166,8 +175,13 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
     private void distributeTableGroup(WriterResponse response, Mapping mapping, TableGroupPicker tableGroupPicker, List<Field> sourceFields, boolean enableFilter) {
         // 1、映射字段
+        boolean enableSchemaResolver = profileComponent.getSystemConfig().isEnableSchemaResolver();
+        ConnectorConfig sourceConfig = getConnectorConfig(mapping.getSourceConnectorId());
+        ConnectorService sourceConnector = connectorFactory.getConnectorService(sourceConfig.getConnectorType());
         List<Map> sourceDataList = new ArrayList<>();
-        List<Map> targetDataList = tableGroupPicker.getPicker().pickTargetData(sourceFields, enableFilter, response.getDataList(), sourceDataList);
+        List<Map> targetDataList = tableGroupPicker.getPicker()
+                .setSourceResolver(enableSchemaResolver ? sourceConnector.getSchemaResolver() : null)
+                .pickTargetData(sourceFields, enableFilter, response.getDataList(), sourceDataList);
         if (CollectionUtils.isEmpty(targetDataList)) {
             return;
         }
@@ -178,7 +192,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
         // 3、插件转换
         final IncrementPluginContext context = new IncrementPluginContext();
-        context.setSourceConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getSourceConnectorId())));
+        context.setSourceConnectorInstance(connectorFactory.connect(sourceConfig));
         context.setTargetConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId())));
         context.setSourceTableName(tableGroup.getSourceTable().getName());
         context.setTargetTableName(tableGroup.getTargetTable().getName());
@@ -190,7 +204,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         context.setTargetList(targetDataList);
         context.setPluginExtInfo(tableGroup.getPluginExtInfo());
         context.setForceUpdate(mapping.isForceUpdate());
-        context.setEnableSchemaResolver(profileComponent.getSystemConfig().isEnableSchemaResolver());
+        context.setEnableSchemaResolver(enableSchemaResolver);
         pluginFactory.process(tableGroup.getPlugin(), context, ProcessEnum.CONVERT);
 
         // 4、批量执行同步
@@ -220,7 +234,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
                 String targetTableName = tableGroup.getTargetTable().getName();
                 List<FieldMapping> originalFieldMappings = tableGroup.getFieldMapping();
                 DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName, originalFieldMappings);
-                final ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
+                ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
                 Result result = connectorFactory.writerDDL(tConnectorInstance, targetDDLConfig);
                 result.setTableGroupId(tableGroup.getId());
                 result.setTargetTableGroupName(targetTableName);

+ 9 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -29,6 +29,8 @@ import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
 import org.dbsyncer.sdk.plugin.PluginContext;
+import org.dbsyncer.sdk.schema.SchemaResolver;
+import org.dbsyncer.sdk.spi.ConnectorService;
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,6 +156,8 @@ public class ParserComponentImpl implements ParserComponent {
         context.setSupportedCursor(StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR)));
         context.setPageSize(mapping.getReadNum());
         context.setEnableSchemaResolver(profileComponent.getSystemConfig().isEnableSchemaResolver());
+        ConnectorService sourceConnector = connectorFactory.getConnectorService(context.getSourceConnectorInstance().getConfig());
+        picker.setSourceResolver(context.isEnableSchemaResolver() ? sourceConnector.getSchemaResolver() : null);
         // 0、插件前置处理
         pluginFactory.process(group.getPlugin(), context, ProcessEnum.BEFORE);
 
@@ -218,6 +222,11 @@ public class ParserComponentImpl implements ParserComponent {
         int batchSize = context.getBatchSize();
         // 总数
         int total = context.getTargetList().size();
+        // 单次任务
+        if (total <= batchSize) {
+            return connectorFactory.writer(context);
+        }
+
         // 批量任务, 拆分
         int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
         CountDownLatch latch = new CountDownLatch(taskSize);

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -8,6 +8,7 @@ import org.dbsyncer.sdk.enums.OperationEnum;
 import org.dbsyncer.sdk.filter.CompareFilter;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.Filter;
+import org.dbsyncer.sdk.schema.SchemaResolver;
 
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -23,6 +24,7 @@ public class Picker {
     private final boolean enabledFilter;
     private List<Filter> add;
     private List<Filter> or;
+    private SchemaResolver sourceResolver;
 
     public Picker(TableGroup tableGroup) {
         if (!CollectionUtils.isEmpty(tableGroup.getFieldMapping())) {
@@ -174,6 +176,10 @@ public class Picker {
             }
             if (null != sField && null != tField) {
                 v = source.get(sField.getName());
+                // 合并为标准数据类型
+                if (sourceResolver != null) {
+                    v = sourceResolver.merge(v, sField);
+                }
                 tFieldName = tField.getName();
                 // 映射值
                 if (!target.containsKey(tFieldName)) {
@@ -199,4 +205,8 @@ public class Picker {
         return Collections.unmodifiableList(fields);
     }
 
+    public Picker setSourceResolver(SchemaResolver sourceResolver) {
+        this.sourceResolver = sourceResolver;
+        return this;
+    }
 }

+ 3 - 3
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -80,7 +80,7 @@ public class PluginFactory implements DisposableBean {
 
     public synchronized void loadPlugins() {
         if (!CollectionUtils.isEmpty(plugins)) {
-            List<Plugin> unmodifiablePlugin = plugins.stream().filter(p -> p.isUnmodifiable()).collect(Collectors.toList());
+            List<Plugin> unmodifiablePlugin = plugins.stream().filter(Plugin::isUnmodifiable).collect(Collectors.toList());
             plugins.clear();
             plugins.addAll(unmodifiablePlugin);
         }
@@ -91,7 +91,7 @@ public class PluginFactory implements DisposableBean {
         }
         Collection<File> files = FileUtils.listFiles(new File(PLUGIN_PATH), new String[]{"jar"}, true);
         if (!CollectionUtils.isEmpty(files)) {
-            files.forEach(f -> loadPlugin(f));
+            files.forEach(this::loadPlugin);
         }
         logger.info("PreLoad plugin:{}", plugins.size());
     }
@@ -135,7 +135,7 @@ public class PluginFactory implements DisposableBean {
     }
 
     public String createPluginId(String pluginClassName, String pluginVersion) {
-        return new StringBuilder(pluginClassName).append("_").append(pluginVersion).toString();
+        return pluginClassName + "_" + pluginVersion;
     }
 
     /**

+ 4 - 6
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/AbstractConnector.java

@@ -86,10 +86,10 @@ public abstract class AbstractConnector {
     /**
      * 转换字段值
      *
-     * @param connectorInstance
      * @param context
+     * @param connectorInstance
      */
-    public void convertProcessBeforeWriter(ConnectorInstance connectorInstance, PluginContext context) {
+    public void convertProcessBeforeWriter(PluginContext context, ConnectorInstance connectorInstance) {
         if (CollectionUtils.isEmpty(context.getTargetFields()) || CollectionUtils.isEmpty(context.getTargetList())) {
             return;
         }
@@ -116,7 +116,7 @@ public abstract class AbstractConnector {
         }
     }
 
-    public void convertProcessBeforeWriter(SchemaResolver resolver, PluginContext context) {
+    public void convertProcessBeforeWriter(PluginContext context, SchemaResolver targetResolver) {
         if (CollectionUtils.isEmpty(context.getTargetFields()) || CollectionUtils.isEmpty(context.getTargetList())) {
             return;
         }
@@ -127,9 +127,7 @@ public abstract class AbstractConnector {
                     continue;
                 }
                 try {
-                    // 根据目标字段类型转换值
-                    Object o = resolver.merge(row.get(f.getName()), f);
-                    row.put(f.getName(), resolver.convert(o, f));
+                    row.computeIfPresent(f.getName(), (k, v) -> targetResolver.convert(v, f));
                 } catch (Exception e) {
                     logger.error(String.format("convert value error: (%s, %s, %s)", context.getTargetTableName(), f.getName(), row.get(f.getName())), e);
                     throw new SdkException(e);