1
0
AE86 2 жил өмнө
parent
commit
50fb198c69

+ 2 - 6
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -3,11 +3,7 @@ package org.dbsyncer.biz.checker.impl.mapping;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker;
-import org.dbsyncer.common.util.BooleanUtil;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.NumberUtil;
-import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.common.util.*;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.manager.Manager;
@@ -138,7 +134,7 @@ public class MappingChecker extends AbstractChecker {
         if (!CollectionUtils.isEmpty(groupAll)) {
             // 手动排序
             String[] sortedTableGroupIds = StringUtil.split(params.get("sortedTableGroupIds"), "|");
-            if(null != sortedTableGroupIds && sortedTableGroupIds.length>0){
+            if (null != sortedTableGroupIds && sortedTableGroupIds.length > 0) {
                 Map<String, TableGroup> tableGroupMap = groupAll.stream().collect(Collectors.toMap(TableGroup::getId, f -> f, (k1, k2) -> k1));
                 groupAll.clear();
                 int size = sortedTableGroupIds.length;

+ 8 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/model/ConvertContext.java

@@ -14,8 +14,16 @@ public class ConvertContext {
      */
     protected ProxyApplicationContext context;
 
+    /**
+     * 目标表
+     */
+    protected String targetTableName;
+
     public ProxyApplicationContext getContext() {
         return context;
     }
 
+    public String getTargetTableName() {
+        return targetTableName;
+    }
 }

+ 2 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/model/FullConvertContext.java

@@ -22,8 +22,9 @@ public class FullConvertContext extends ConvertContext {
      */
     private List<Map> targetList;
 
-    public FullConvertContext(ProxyApplicationContext context, List<Map> sourceList, List<Map> targetList) {
+    public FullConvertContext(ProxyApplicationContext context, String targetTableName, List<Map> sourceList, List<Map> targetList) {
         this.context = context;
+        this.targetTableName = targetTableName;
         this.sourceList = sourceList;
         this.targetList = targetList;
     }

+ 2 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/model/IncrementConvertContext.java

@@ -26,8 +26,9 @@ public class IncrementConvertContext extends ConvertContext {
      */
     private Map target;
 
-    public IncrementConvertContext(ProxyApplicationContext context, String event, Map source, Map target) {
+    public IncrementConvertContext(ProxyApplicationContext context, String targetTableName, String event, Map source, Map target) {
         this.context = context;
+        this.targetTableName = targetTableName;
         this.event = event;
         this.source = source;
         this.target = target;

+ 6 - 22
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertService.java

@@ -3,8 +3,6 @@ package org.dbsyncer.common.spi;
 import org.dbsyncer.common.model.FullConvertContext;
 import org.dbsyncer.common.model.IncrementConvertContext;
 
-import java.sql.Connection;
-
 /**
  * 插件扩展服务接口
  * <p>全量同步/增量同步,扩展转换</p>
@@ -22,35 +20,22 @@ public interface ConvertService {
      */
     void convert(FullConvertContext context);
 
-    /**
-     * @author wangxiri
-     * @date 2022/10/25
-     * 数据插入后处理接口---全量更新未测试
-     * @param context 上下文
-     * @param connection 目标数据库连接
-     * @param targetTableName 数据更新表名
-     *
-
-     */
-    void AfterConvert(FullConvertContext context,Connection connection,String targetTableName);
-
     /**
      * 增量同步
+     *
      * @param context 上下文
      */
     void convert(IncrementConvertContext context);
 
-
     /**
-     *  @author wangxiri
      * 数据插入后处理接口
-     * @date 2022/10/25
-     * @param context 上下文
-     * @param connection 目标数据库连接
-     * @param targetTableName 数据更新表名
      *
+     * @param context 上下文
+     * @author wangxiri
+     * @date 2022/10/25
      */
-    void AfterConvert(IncrementConvertContext context, Connection connection,String targetTableName);
+    default void postProcessAfter(IncrementConvertContext context) {
+    }
 
     /**
      * 版本号
@@ -70,5 +55,4 @@ public interface ConvertService {
         return getClass().getSimpleName();
     }
 
-
 }

+ 4 - 4
dbsyncer-connector/pom.xml

@@ -40,10 +40,10 @@
             <artifactId>postgis-jdbc</artifactId>
             <version>2.5.1</version>
         </dependency>
-<!--        <dependency>-->
-<!--            <groupId>oracle</groupId>-->
-<!--            <artifactId>sdoapi</artifactId>-->
-<!--        </dependency>-->
+        <dependency>
+            <groupId>oracle</groupId>
+            <artifactId>sdoapi</artifactId>
+        </dependency>
 
         <!-- sqlserver-driver -->
         <dependency>

+ 0 - 32
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseValueMapper.java

@@ -1,32 +0,0 @@
-package org.dbsyncer.connector.database;
-
-import com.microsoft.sqlserver.jdbc.Geometry;
-import oracle.jdbc.OracleConnection;
-//import oracle.spatial.geometry.JGeometry;
-import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.database.ds.SimpleConnection;
-
-import java.sql.SQLException;
-import java.sql.Struct;
-
-public class DatabaseValueMapper {
-
-    private SimpleConnection connection;
-
-    public DatabaseValueMapper(SimpleConnection connection) {
-        this.connection = connection;
-    }
-
-    public Struct getStruct(byte[] val) throws SQLException {
-        return null;
-//        if (connection.getConnection() instanceof OracleConnection) {
-//            OracleConnection conn = connection.unwrap(OracleConnection.class);
-//            Geometry geometry = Geometry.deserialize(val);
-//            Double x = geometry.getX();
-//            Double y = geometry.getY();
-//            JGeometry jGeometry = new JGeometry(x, y, 0);
-//            return JGeometry.store(jGeometry, conn);
-//        }
-//        throw new ConnectorException(String.format("%s can not get STRUCT [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
-    }
-}

+ 12 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java

@@ -1,9 +1,11 @@
 package org.dbsyncer.connector.schema;
 
+import com.microsoft.sqlserver.jdbc.Geometry;
+import oracle.jdbc.OracleConnection;
+import oracle.spatial.geometry.JGeometry;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.database.DatabaseValueMapper;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 
 import java.sql.Connection;
@@ -27,8 +29,15 @@ public class OtherValueMapper extends AbstractValueMapper<Struct> {
         if (val instanceof byte[]) {
             Object connection = connectorMapper.getConnection();
             if (connection instanceof Connection) {
-                final DatabaseValueMapper mapper = new DatabaseValueMapper((SimpleConnection) connection);
-                return mapper.getStruct((byte[]) val);
+                SimpleConnection simpleConnection = (SimpleConnection) connection;
+                if (simpleConnection instanceof OracleConnection) {
+                    OracleConnection conn = simpleConnection.unwrap(OracleConnection.class);
+                    Geometry geometry = Geometry.deserialize((byte[]) val);
+                    Double x = geometry.getX();
+                    Double y = geometry.getY();
+                    JGeometry jGeometry = new JGeometry(x, y, 0);
+                    return JGeometry.store(jGeometry, conn);
+                }
             }
         }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -273,7 +273,7 @@ public class ParserFactory implements Parser {
             ConvertUtil.convert(group.getConvert(), target);
 
             // 4、插件转换
-            pluginFactory.convert(group.getPlugin(), data, target);
+            pluginFactory.convert(group.getPlugin(), tTableName, data, target);
 
             // 5、写入目标源
             BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);

+ 6 - 22
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.parser.flush.impl;
 
-import org.apache.commons.logging.Log;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.model.Result;
@@ -15,13 +14,10 @@ import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.plugin.PluginFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.sql.Connection;
 import java.util.List;
 import java.util.Map;
 
@@ -33,9 +29,6 @@ import java.util.Map;
 @Component
 public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-
     @Autowired
     private ConnectorFactory connectorFactory;
 
@@ -65,7 +58,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Override
     protected void partition(WriterRequest request, WriterResponse response) {
         response.getDataList().add(request.getRow());
-        if(StringUtil.isNotBlank(request.getMessageId())){
+        if (StringUtil.isNotBlank(request.getMessageId())) {
             response.getMessageIds().add(request.getMessageId());
         }
         if (response.isMerged()) {
@@ -93,29 +86,20 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         ConvertUtil.convert(tableGroup.getConvert(), targetDataList);
 
         // 4、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), event, sourceDataList, targetDataList);
+        pluginFactory.convert(tableGroup.getPlugin(), targetTableName, event, sourceDataList, targetDataList);
+
         // 5、批量执行同步
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         BatchWriter batchWriter = new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount());
         Result result = parserFactory.writeBatch(batchWriter);
 
-
         // 6、持久化同步结果
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
 
-        //6.2、执行批量处理后的
-        //by wangxir @20221025
-        try {
-            if (targetConnectorMapper.getConnection() instanceof Connection)
-                pluginFactory.AfterConvert((Connection) targetConnectorMapper.getConnection(),targetTableName,mapping.getPlugin(), event, sourceDataList, targetDataList);
-
-        }
-        catch (Exception ex)
-        {
-            logger.error(ex.getMessage());
-        }
+        // 7、执行批量处理后的
+        pluginFactory.postProcessAfter(tableGroup.getPlugin(), targetTableName, event, sourceDataList, targetDataList);
 
-        // 7、完成处理
+        // 8、完成处理
         parserStrategy.complete(response.getMessageIds());
     }
 

+ 17 - 14
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -7,7 +7,6 @@ import org.dbsyncer.common.spi.ConvertService;
 import org.dbsyncer.common.spi.ProxyApplicationContext;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.plugin.config.Plugin;
-//import org.postgresql.jdbc2.optional.SimpleDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -19,7 +18,6 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.sql.Connection;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -98,40 +96,45 @@ public class PluginFactory {
         return Collections.unmodifiableList(plugins);
     }
 
-    public void convert(Plugin plugin, List<Map> sourceList, List<Map> targetList) {
+    public void convert(Plugin plugin, String targetTableName, List<Map> sourceList, List<Map> targetList) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, sourceList, targetList));
+            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, targetTableName, sourceList, targetList));
         }
     }
 
-    public void convert(Plugin plugin, String event, List<Map> sourceList, List<Map> targetList) {
+    public void convert(Plugin plugin, String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
             ConvertService convertService = service.get(plugin.getClassName());
             int size = sourceList.size();
-            if(size == targetList.size()){
+            if (size == targetList.size()) {
                 for (int i = 0; i < size; i++) {
-                    convertService.convert(new IncrementConvertContext(applicationContextProxy, event, sourceList.get(i), targetList.get(i)));
+                    convertService.convert(new IncrementConvertContext(applicationContextProxy, targetTableName, event, sourceList.get(i), targetList.get(i)));
                 }
             }
         }
     }
 
-    public void AfterConvert(Connection connection, String targetTableName,Plugin plugin, String event, List<Map> sourceList, List<Map> targetList)
-    {
+    /**
+     * 完成同步后执行处理
+     *
+     * @param plugin
+     * @param targetTableName
+     * @param event
+     * @param sourceList
+     * @param targetList
+     */
+    public void postProcessAfter(Plugin plugin, String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
             ConvertService convertService = service.get(plugin.getClassName());
             int size = sourceList.size();
-            if(size == targetList.size()){
+            if (size == targetList.size()) {
                 for (int i = 0; i < size; i++) {
-//                    convertService.convert(new IncrementConvertContext(applicationContextProxy, event, sourceList.get(i), targetList.get(i)));
-                    convertService.AfterConvert(new IncrementConvertContext(applicationContextProxy, event, sourceList.get(i), targetList.get(i)),connection,targetTableName);
+                    convertService.postProcessAfter(new IncrementConvertContext(applicationContextProxy, targetTableName, event, sourceList.get(i), targetList.get(i)));
                 }
             }
         }
     }
 
-//    public void convert()
-
     /**
      * SPI, 扫描jar扩展接口实现,注册为本地服务
      *