Forráskód Böngészése

新增字段解析器开关

穿云 3 hónapja
szülő
commit
9a6670c17a

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-base/src/main/java/org/dbsyncer/connector/base/ConnectorFactory.java

@@ -183,7 +183,7 @@ public class ConnectorFactory implements DisposableBean {
             AbstractConnector conn = (AbstractConnector) connector;
             try {
                 SchemaResolver schemaResolver = connector.getSchemaResolver();
-                if (schemaResolver != null) {
+                if (config.isEnableSchemaResolver() && schemaResolver != null) {
                     conn.convertProcessBeforeWriter(schemaResolver, config);
                 } else {
                     conn.convertProcessBeforeWriter(connectorInstance, config);

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -67,7 +67,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     private ParserComponent parserComponent;
 
     @Resource
-    protected ProfileComponent profileComponent;
+    private ProfileComponent profileComponent;
 
     @Resource
     private PluginFactory pluginFactory;

+ 4 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -220,7 +220,8 @@ public class ParserComponentImpl implements ParserComponent {
         int total = dataList.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(context.getTargetConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList, context.isForceUpdate()));
+            WriterBatchConfig batchConfig = new WriterBatchConfig(tableName, event, command, fields, dataList, context.isForceUpdate(), context.isEnableSchemaResolver());
+            return connectorFactory.writer(context.getTargetConnectorInstance(), batchConfig);
         }
 
         // 批量任务, 拆分
@@ -242,7 +243,8 @@ public class ParserComponentImpl implements ParserComponent {
 
             executor.execute(() -> {
                 try {
-                    Result w = connectorFactory.writer(context.getTargetConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, data, context.isForceUpdate()));
+                    WriterBatchConfig batchConfig = new WriterBatchConfig(tableName, event, command, fields, data, context.isForceUpdate(), context.isEnableSchemaResolver());
+                    Result w = connectorFactory.writer(context.getTargetConnectorInstance(), batchConfig);
                     result.addSuccessData(w.getSuccessData());
                     result.addFailData(w.getFailData());
                     result.getError().append(w.getError());

+ 16 - 7
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/WriterBatchConfig.java

@@ -10,35 +10,40 @@ public class WriterBatchConfig {
     /**
      * 表名
      */
-    private String tableName;
+    private final String tableName;
     /**
      * 事件
      */
-    private String event;
+    private final String event;
     /**
      * 执行命令
      */
-    private Map<String, String> command;
+    private final Map<String, String> command;
     /**
      * 字段信息
      */
-    private List<Field> fields;
+    private final List<Field> fields;
     /**
      * 集合数据
      */
-    private List<Map> data;
+    private final List<Map> data;
     /**
      * 覆盖写入
      */
-    private boolean forceUpdate;
+    private final boolean forceUpdate;
+    /**
+     * 是否启用字段解析器
+     */
+    private final boolean enableSchemaResolver;
 
-    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data, boolean forceUpdate) {
+    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data, boolean forceUpdate, boolean enableSchemaResolver) {
         this.tableName = tableName;
         this.event = event;
         this.command = command;
         this.fields = fields;
         this.data = data;
         this.forceUpdate = forceUpdate;
+        this.enableSchemaResolver = enableSchemaResolver;
     }
 
     public String getTableName() {
@@ -64,4 +69,8 @@ public class WriterBatchConfig {
     public boolean isForceUpdate() {
         return forceUpdate;
     }
+
+    public boolean isEnableSchemaResolver() {
+        return enableSchemaResolver;
+    }
 }

+ 2 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractListener.java

@@ -57,7 +57,7 @@ public abstract class AbstractListener<C extends ConnectorInstance> implements L
                     break;
                 case ConnectorConstant.OPERTION_INSERT:
                     // 是否支持监听新增事件
-                     processEvent(listenerConfig.isEnableInsert(), event);
+                    processEvent(listenerConfig.isEnableInsert(), event);
                     break;
                 case ConnectorConstant.OPERTION_DELETE:
                     // 是否支持监听删除事件
@@ -65,7 +65,7 @@ public abstract class AbstractListener<C extends ConnectorInstance> implements L
                     break;
                 case ConnectorConstant.OPERTION_ALTER:
                     // 表结构变更事件
-                    processEvent(listenerConfig.isEnableDDL(),event);
+                    processEvent(listenerConfig.isEnableDDL(), event);
                     break;
                 default:
                     break;

+ 14 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/AbstractPluginContext.java

@@ -64,6 +64,11 @@ public abstract class AbstractPluginContext implements PluginContext {
      */
     private boolean forceUpdate;
 
+    /**
+     * 是否启用字段解析器
+     */
+    private boolean enableSchemaResolver;
+
     /**
      * 数据源数据集合
      */
@@ -162,6 +167,15 @@ public abstract class AbstractPluginContext implements PluginContext {
         this.forceUpdate = forceUpdate;
     }
 
+    @Override
+    public boolean isEnableSchemaResolver() {
+        return enableSchemaResolver;
+    }
+
+    public void setEnableSchemaResolver(boolean enableSchemaResolver) {
+        this.enableSchemaResolver = enableSchemaResolver;
+    }
+
     @Override
     public Map<String, String> getCommand() {
         return command;

+ 5 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/PluginContext.java

@@ -80,6 +80,11 @@ public interface PluginContext extends BaseContext {
      */
     boolean isForceUpdate();
 
+    /**
+     * 是否启用字段解析器
+     */
+    boolean isEnableSchemaResolver();
+
     /**
      * 数据源数据集合
      */