瀏覽代碼

!83 merge
Merge pull request !83 from AE86/fuxinpeng-beta

AE86 2 年之前
父節點
當前提交
d093057769
共有 21 個文件被更改,包括 355 次插入64 次删除
  1. 1 1
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/ClosedEvent.java
  2. 10 0
      dbsyncer-connector/pom.xml
  3. 2 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java
  4. 10 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  5. 39 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java
  6. 4 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  7. 11 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java
  8. 18 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseValueMapper.java
  9. 11 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnectorMapper.java
  10. 11 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnectorMapper.java
  11. 11 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnectorMapper.java
  12. 17 12
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/BitValueMapper.java
  13. 35 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java
  14. 103 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/LsnPuller.java
  15. 29 4
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  16. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  17. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java
  18. 14 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  19. 3 2
      dbsyncer-web/src/main/resources/static/js/common.js
  20. 3 4
      dbsyncer-web/src/main/resources/static/js/index/index.js
  21. 20 21
      pom.xml

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/event/ClosedEvent.java → dbsyncer-common/src/main/java/org/dbsyncer/common/event/ClosedEvent.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.manager.event;
+package org.dbsyncer.common.event;
 
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.event.ApplicationContextEvent;

+ 10 - 0
dbsyncer-connector/pom.xml

@@ -36,12 +36,22 @@
             <artifactId>ojdbc6</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>oracle</groupId>
+            <artifactId>sdoapi</artifactId>
+        </dependency>
+
         <!-- sqlserver-driver -->
         <dependency>
             <groupId>com.microsoft.sqlserver</groupId>
             <artifactId>mssql-jdbc</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr4-runtime</artifactId>
+        </dependency>
+
         <!-- postgresql -->
         <dependency>
             <groupId>org.postgresql</groupId>

+ 2 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -14,7 +14,7 @@ import java.util.Map;
 
 public abstract class AbstractConnector {
 
-    private static Map<Integer, ValueMapper> values = new LinkedHashMap<>();
+    private static final Map<Integer, ValueMapper> values = new LinkedHashMap<>();
 
     static {
         // 常用类型
@@ -47,6 +47,7 @@ public abstract class AbstractConnector {
         values.putIfAbsent(Types.REAL, new RealValueMapper());
         values.putIfAbsent(Types.VARBINARY, new VarBinaryValueMapper());
         values.putIfAbsent(Types.LONGVARBINARY, new LongVarBinaryValueMapper());
+        values.putIfAbsent(Types.OTHER, new OtherValueMapper());
     }
 
     /**

+ 10 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -50,7 +50,14 @@ public class ConnectorFactory implements DisposableBean {
                 }
             }
         }
-        return connectorCache.get(cacheKey);
+        ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
+        try {
+            ConnectorMapper clone = (ConnectorMapper) connectorMapper.clone();
+            clone.setConfig(config);
+            return clone;
+        } catch (CloneNotSupportedException e) {
+            throw new ConnectorException(e);
+        }
     }
 
     /**
@@ -146,7 +153,7 @@ public class ConnectorFactory implements DisposableBean {
 
     public Result writer(ConnectorMapper connectorMapper, WriterBatchConfig config) {
         Connector connector = getConnector(connectorMapper);
-        if(connector instanceof AbstractConnector){
+        if (connector instanceof AbstractConnector) {
             AbstractConnector conn = (AbstractConnector) connector;
             try {
                 conn.convertProcessBeforeWriter(connectorMapper, config);
@@ -189,4 +196,4 @@ public class ConnectorFactory implements DisposableBean {
         getConnector(connectorMapper).disconnect(connectorMapper);
     }
 
-}
+}

+ 39 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -11,19 +11,57 @@ import org.dbsyncer.connector.config.ConnectorConfig;
  * @version 1.0.0
  * @date 2022/3/20 23:00
  */
-public interface ConnectorMapper<K, V> {
+public interface ConnectorMapper<K, V> extends Cloneable {
 
+    /**
+     * 获取连接配置
+     *
+     * @return
+     */
     default ConnectorConfig getOriginalConfig() {
         return (ConnectorConfig) getConfig();
     }
 
+    /**
+     * 获取连接器类型
+     *
+     * @return
+     */
     default String getConnectorType() {
         return getOriginalConfig().getConnectorType();
     }
 
+    /**
+     * 获取连接配置
+     *
+     * @return
+     */
     K getConfig();
 
+    /**
+     * 设置
+     * @param k
+     */
+    void setConfig(K k);
+
+    /**
+     * 获取连接通道实例
+     *
+     * @return
+     * @throws Exception
+     */
     V getConnection() throws Exception;
 
+    /**
+     * 关闭连接器
+     */
     void close();
+
+    /**
+     * 浅拷贝连接器
+     *
+     * @return
+     * @throws CloneNotSupportedException
+     */
+    Object clone() throws CloneNotSupportedException;
 }

+ 4 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -101,7 +101,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         Assert.hasText(queryCountSql, "查询总数语句不能为空.");
 
         // 2、返回结果集
-        return connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(queryCountSql, Long.class));
+        return connectorMapper.execute(databaseTemplate -> {
+            Long count = databaseTemplate.queryForObject(queryCountSql, Long.class);
+            return count == null ? 0 : count;
+        });
     }
 
     @Override

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -41,6 +41,11 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         return config;
     }
 
+    @Override
+    public void setConfig(DatabaseConfig config) {
+        this.config = config;
+    }
+
     @Override
     public Connection getConnection() throws Exception {
         return dataSource.getConnection();
@@ -51,4 +56,9 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         dataSource.close();
     }
 
-}
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+}

+ 18 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseValueMapper.java

@@ -1,13 +1,14 @@
 package org.dbsyncer.connector.database;
 
+import com.microsoft.sqlserver.jdbc.Geometry;
 import oracle.jdbc.OracleConnection;
+import oracle.spatial.geometry.JGeometry;
+import oracle.sql.STRUCT;
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 
 import java.nio.charset.Charset;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.NClob;
-import java.sql.SQLException;
+import java.sql.*;
 
 public class DatabaseValueMapper {
 
@@ -46,4 +47,17 @@ public class DatabaseValueMapper {
         }
         return connection.createClob();
     }
+
+    public Struct getSTRUCT(byte[] val) throws SQLException {
+        if (connection.getConnection() instanceof OracleConnection) {
+            OracleConnection conn = connection.unwrap(OracleConnection.class);
+            Geometry geometry = Geometry.deserialize(val);
+            Double x = geometry.getX();
+            Double y = geometry.getY();
+            JGeometry jGeometry = new JGeometry(x, y, 0);
+            STRUCT struct = JGeometry.store(jGeometry, conn);
+            return struct;
+        }
+        throw new ConnectorException(String.format("%s can not get STRUCT [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
+    }
 }

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnectorMapper.java

@@ -19,6 +19,11 @@ public final class ESConnectorMapper implements ConnectorMapper<ESConfig, RestHi
         return config;
     }
 
+    @Override
+    public void setConfig(ESConfig config) {
+        this.config = config;
+    }
+
     @Override
     public RestHighLevelClient getConnection() {
         return client;
@@ -28,4 +33,9 @@ public final class ESConnectorMapper implements ConnectorMapper<ESConfig, RestHi
     public void close() {
         ESUtil.close(client);
     }
-}
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+}

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnectorMapper.java

@@ -41,6 +41,11 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
         return config;
     }
 
+    @Override
+    public void setConfig(FileConfig config) {
+        this.config = config;
+    }
+
     @Override
     public String getConnection() {
         return config.getFileDir();
@@ -51,6 +56,11 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
         fileSchemaMap.clear();
     }
 
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
     public List<FileSchema> getFileSchemaList() {
         return fileSchemaList;
     }
@@ -78,4 +88,4 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
             Assert.isTrue(file.exists(), String.format("found not file '%s'", filePath));
         }
     }
-}
+}

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnectorMapper.java

@@ -18,6 +18,11 @@ public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig,
         return config;
     }
 
+    @Override
+    public void setConfig(KafkaConfig config) {
+        this.config = config;
+    }
+
     @Override
     public KafkaClient getConnection() {
         return client;
@@ -27,4 +32,9 @@ public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig,
     public void close() {
         KafkaUtil.close(client);
     }
-}
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+}

+ 17 - 12
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/BitValueMapper.java

@@ -23,23 +23,28 @@ public class BitValueMapper extends AbstractValueMapper<byte[]> {
             return bitSet.toByteArray();
         }
         if (val instanceof Integer) {
-            buffer.clear();
-            buffer.putInt((Integer) val);
-            buffer.flip();
-            byte[] bytes = new byte[4];
-            buffer.get(bytes);
-            return bytes;
+            synchronized (this){
+                buffer.clear();
+                buffer.putInt((Integer) val);
+                buffer.flip();
+                byte[] bytes = new byte[4];
+                buffer.get(bytes);
+                return bytes;
+            }
         }
         if (val instanceof Boolean) {
             Boolean b = (Boolean) val;
-            buffer.clear();
-            buffer.putShort((short) (b ? 1 : 0));
-            buffer.flip();
-            byte[] bytes = new byte[2];
-            buffer.get(bytes);
-            return bytes;
+            synchronized (this){
+                buffer.clear();
+                buffer.putShort((short) (b ? 1 : 0));
+                buffer.flip();
+                byte[] bytes = new byte[2];
+                buffer.get(bytes);
+                return bytes;
+            }
         }
 
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
+
 }

+ 35 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java

@@ -0,0 +1,35 @@
+package org.dbsyncer.connector.schema;
+
+import oracle.sql.STRUCT;
+import org.dbsyncer.connector.AbstractValueMapper;
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.database.DatabaseValueMapper;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
+
+import java.sql.Connection;
+import java.sql.Struct;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/9/16 16:54
+ */
+public class OtherValueMapper extends AbstractValueMapper<Struct> {
+
+    @Override
+    protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
+        if (val instanceof STRUCT) {
+            return (Struct) val;
+        }
+        // SqlServer Geometry
+        if (val instanceof byte[]) {
+            Object connection = connectorMapper.getConnection();
+            if (connection instanceof Connection) {
+                final DatabaseValueMapper mapper = new DatabaseValueMapper((SimpleConnection) connection);
+                return mapper.getSTRUCT((byte[]) val);
+            }
+        }
+        throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
+    }
+}

+ 103 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/LsnPuller.java

@@ -0,0 +1,103 @@
+package org.dbsyncer.listener.sqlserver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.LinkedCaseInsensitiveMap;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Xinpeng.Fu
+ * @version V1.0
+ * @description
+ * @date 2022/8/30 10:04
+ */
+public class LsnPuller {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * 间隔拉取最新LSN时间(毫秒)
+     */
+    private static final long DEFAULT_POLL_INTERVAL_MILLIS = 100;
+    private static volatile LsnPuller instance = null;
+    private final Map<String, SqlServerExtractor> map = new ConcurrentHashMap<>();
+    private Worker worker;
+
+    private LsnPuller() {
+        initWorker();
+    }
+
+    private static LsnPuller getInstance() {
+        if (instance == null) {
+            synchronized (LsnPuller.class) {
+                if (instance == null) {
+                    instance = new LsnPuller();
+                }
+            }
+        }
+        return instance;
+    }
+
+    private void initWorker() {
+        worker = new Worker();
+        worker.setName("cdc-LsnPuller");
+        worker.setDaemon(false);
+        worker.start();
+    }
+
+    public static void addExtractor(String metaId, SqlServerExtractor extractor) {
+        getInstance().map.put(metaId, extractor);
+    }
+
+    public static void removeExtractor(String metaId) {
+        getInstance().map.remove(metaId);
+    }
+
+    final class Worker extends Thread {
+
+        private final Map<String, Lsn> maxLsnSnapshot = new LinkedCaseInsensitiveMap<>();
+
+        @Override
+        public void run() {
+            while (!isInterrupted()) {
+                try {
+                    if (map.isEmpty()) {
+                        TimeUnit.SECONDS.sleep(1);
+                        continue;
+                    }
+                    maxLsnSnapshot.clear();
+                    Lsn maxLsn = null;
+                    for (SqlServerExtractor extractor : map.values()) {
+                        maxLsn = getMaxLsn(maxLsnSnapshot, extractor);
+                        if (null != maxLsn && maxLsn.isAvailable() && maxLsn.compareTo(extractor.getLastLsn()) > 0) {
+                            extractor.pushStopLsn(maxLsn);
+                        }
+                    }
+                    TimeUnit.MILLISECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
+                } catch (Exception e) {
+                    logger.error("异常", e);
+                    try {
+                        TimeUnit.SECONDS.sleep(1);
+                    } catch (InterruptedException ex) {
+                        logger.warn(ex.getMessage());
+                    }
+                }
+            }
+        }
+
+    }
+
+    private Lsn getMaxLsn(Map<String, Lsn> maxLsnSnapshot, SqlServerExtractor extractor) {
+        final String url = extractor.getDatabaseConfigUrl();
+        if (maxLsnSnapshot.containsKey(url)) {
+            return maxLsnSnapshot.get(url);
+        }
+
+        Lsn maxLsn = extractor.getMaxLsn();
+        maxLsnSnapshot.put(url, maxLsn);
+        return maxLsn;
+    }
+
+}

+ 29 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -18,6 +18,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -55,6 +56,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     private Lsn lastLsn;
     private String serverName;
     private String schema;
+    private LinkedBlockingQueue<Lsn> stopLsnQueue = new LinkedBlockingQueue<>();
 
     @Override
     public void start() {
@@ -78,6 +80,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
             worker.setName(new StringBuilder("cdc-parser-").append(serverName).append("_").append(RandomUtil.nextInt(1, 100)).toString());
             worker.setDaemon(false);
             worker.start();
+            LsnPuller.addExtractor(metaId, this);
         } catch (Exception e) {
             close();
             logger.error("启动失败:{}", e.getMessage());
@@ -90,6 +93,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     @Override
     public void close() {
         if (connected) {
+            LsnPuller.removeExtractor(metaId);
             if (null != worker && !worker.isInterrupted()) {
                 worker.interrupt();
                 worker = null;
@@ -308,15 +312,22 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         return (T) execute;
     }
 
+    public Lsn getMaxLsn(){
+        return queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
+    }
+
     final class Worker extends Thread {
 
         @Override
         public void run() {
             while (!isInterrupted() && connected) {
                 try {
-                    Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
-                    if (null == stopLsn || !stopLsn.isAvailable() || stopLsn.compareTo(lastLsn) <= 0) {
-                        sleepInMills(500L);
+                    Lsn stopLsn = stopLsnQueue.take();
+                    Lsn poll;
+                    while((poll = stopLsnQueue.poll()) != null){
+                        stopLsn = poll;
+                    }
+                    if (!stopLsn.isAvailable() || stopLsn.compareTo(lastLsn) <= 0) {
                         continue;
                     }
 
@@ -330,7 +341,21 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
                 }
             }
         }
+    }
 
+    public String getDatabaseConfigUrl(){
+        DatabaseConfig config = (DatabaseConfig) connectorConfig;
+        return config.getUrl();
     }
 
-}
+    public Lsn getLastLsn() {
+        return lastLsn;
+    }
+
+    public void pushStopLsn(Lsn stopLsn) {
+        if(stopLsnQueue.contains(stopLsn)){
+            return;
+        }
+        stopLsnQueue.offer(stopLsn);
+    }
+}

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.manager;
 
+import org.dbsyncer.common.event.ClosedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.manager.event.ClosedEvent;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
@@ -432,4 +432,4 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         return puller;
     }
 
-}
+}

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.manager.puller;
 
-import org.dbsyncer.manager.event.ClosedEvent;
+import org.dbsyncer.common.event.ClosedEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 

+ 14 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,11 +1,12 @@
 package org.dbsyncer.parser.flush.impl;
 
 import com.alibaba.fastjson.JSONException;
+import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.StorageRequest;
-import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
@@ -48,7 +49,7 @@ public class FlushServiceImpl implements FlushService {
         Map<String, Object> params = new HashMap();
         params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
         params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
-        params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
+        params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
         params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
         storageService.addLog(StorageEnum.LOG, params);
     }
@@ -61,7 +62,7 @@ public class FlushServiceImpl implements FlushService {
             row.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             row.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
             row.put(ConfigConstant.DATA_EVENT, event);
-            row.put(ConfigConstant.DATA_ERROR, error);
+            row.put(ConfigConstant.DATA_ERROR, substring(error));
             try {
                 row.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
             } catch (JSONException e) {
@@ -73,4 +74,14 @@ public class FlushServiceImpl implements FlushService {
         });
     }
 
+    /**
+     * 限制记录异常信息长度
+     *
+     * @param error
+     * @return
+     */
+    private String substring(String error){
+        return StringUtil.isNotBlank(error) ? StringUtil.substring(error, 0, 2048) : error;
+    }
+
 }

+ 3 - 2
dbsyncer-web/src/main/resources/static/js/common.js

@@ -19,9 +19,10 @@ function bootGrowl(data, type) {
 }
 
 // 跳转主页
-function backIndexPage() {
+function backIndexPage(projectGroupId) {
     // 加载页面
-    doLoader("/index?refresh=" + new Date().getTime());
+    projectGroupId = (typeof projectGroupId === 'string') ? projectGroupId : '';
+    doLoader("/index?projectGroupId="+ projectGroupId +"&refresh=" + new Date().getTime());
 }
 
 // 美化SQL

+ 3 - 4
dbsyncer-web/src/main/resources/static/js/index/index.js

@@ -180,11 +180,11 @@ function doPost(url) {
 }
 
 // 创建定时器
-function createTimer(){
+function createTimer($projectGroupSelect){
     doGetWithoutLoading("/monitor/getRefreshInterval",{}, function (data) {
         if (data.success == true) {
             timer = setInterval(function(){
-                backIndexPage();
+                backIndexPage($projectGroupSelect.selectpicker('val'));
             }, data.resultValue * 1000);
         } else {
             bootGrowl(data.resultValue, "danger");
@@ -200,6 +200,7 @@ $(function () {
     bindEditProjectGroup($projectGroupSelect);
     bindRemoveProjectGroup($projectGroupSelect);
     bindProjectGroupSelect($projectGroupSelect);
+    createTimer($projectGroupSelect);
 
     bindAddConnector();
     bindEditConnector();
@@ -210,6 +211,4 @@ $(function () {
 
     bindConnectorDropdownMenu();
     bindMappingDropdownMenu();
-
-    createTimer();
 });

+ 20 - 21
pom.xml

@@ -56,31 +56,17 @@
 
     <!-- 镜像仓库地址 -->
     <repositories>
-        <!-- 阿里云仓库 -->
+        <!-- DataNucleus仓库 -->
         <repository>
-            <id>ali</id>
-            <name>ali Repository</name>
-            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
+            <id>datanucleus</id>
+            <url>http://www.datanucleus.org/downloads/maven2/</url>
         </repository>
 
-        <!-- 中央仓库1 -->
-        <repository>
-            <id>repo1</id>
-            <name>Central Repository</name>
-            <url>https://repo1.maven.org/maven2/</url>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-
-        <!-- 中央仓库2 -->
+        <!-- 阿里云仓库 -->
         <repository>
-            <id>repo2</id>
-            <name>Central Repository</name>
-            <url>https://repo2.maven.org/maven2/</url>
+            <id>ali</id>
+            <name>ali Repository</name>
+            <url>https://maven.aliyun.com/repository/google</url>
             <snapshots>
                 <enabled>false</enabled>
             </snapshots>
@@ -147,6 +133,12 @@
                 <version>${ojdbc6.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>oracle</groupId>
+                <artifactId>sdoapi</artifactId>
+                <version>11.2.0</version>
+            </dependency>
+
             <!-- sqlserver-driver -->
             <dependency>
                 <groupId>com.microsoft.sqlserver</groupId>
@@ -154,6 +146,13 @@
                 <version>${mssql-jdbc.version}</version>
             </dependency>
 
+            <!-- antlr4-runtime -->
+            <dependency>
+                <groupId>org.antlr</groupId>
+                <artifactId>antlr4-runtime</artifactId>
+                <version>4.7.2</version>
+            </dependency>
+
             <!-- postgresql -->
             <dependency>
                 <groupId>org.postgresql</groupId>