瀏覽代碼

fix config

AE86 3 年之前
父節點
當前提交
7ba6e73058

+ 10 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -38,7 +38,11 @@ public class ConnectorFactory implements DisposableBean {
         Connector connector = getConnector(config.getConnectorType());
         Connector connector = getConnector(config.getConnectorType());
         String cacheKey = connector.getConnectorMapperCacheKey(config);
         String cacheKey = connector.getConnectorMapperCacheKey(config);
         if (!connectorCache.containsKey(cacheKey)) {
         if (!connectorCache.containsKey(cacheKey)) {
-            connectorCache.putIfAbsent(cacheKey, connector.connect(config));
+            synchronized (connectorCache) {
+                if (!connectorCache.containsKey(cacheKey)) {
+                    connectorCache.putIfAbsent(cacheKey, connector.connect(config));
+                }
+            }
         }
         }
         return connectorCache.get(cacheKey);
         return connectorCache.get(cacheKey);
     }
     }
@@ -54,8 +58,7 @@ public class ConnectorFactory implements DisposableBean {
         Connector connector = getConnector(config.getConnectorType());
         Connector connector = getConnector(config.getConnectorType());
         String cacheKey = connector.getConnectorMapperCacheKey(config);
         String cacheKey = connector.getConnectorMapperCacheKey(config);
         if (connectorCache.containsKey(cacheKey)) {
         if (connectorCache.containsKey(cacheKey)) {
-            ConnectorMapper mapper = connectorCache.get(cacheKey);
-            connector.disconnect(mapper);
+            disconnect(connectorCache.get(cacheKey));
             connectorCache.remove(cacheKey);
             connectorCache.remove(cacheKey);
         }
         }
         connect(config);
         connect(config);
@@ -70,13 +73,12 @@ public class ConnectorFactory implements DisposableBean {
      */
      */
     public boolean isAlive(ConnectorConfig config) {
     public boolean isAlive(ConnectorConfig config) {
         Assert.notNull(config, "ConnectorConfig can not be null.");
         Assert.notNull(config, "ConnectorConfig can not be null.");
-        String type = config.getConnectorType();
-        Connector connector = getConnector(type);
+        Connector connector = getConnector(config.getConnectorType());
         String cacheKey = connector.getConnectorMapperCacheKey(config);
         String cacheKey = connector.getConnectorMapperCacheKey(config);
-        if (!connectorCache.containsKey(cacheKey)) {
-            connect(config);
+        if (connectorCache.containsKey(cacheKey)) {
+            return connector.isAlive(connectorCache.get(cacheKey));
         }
         }
-        return connector.isAlive(connectorCache.get(cacheKey));
+        return false;
     }
     }
 
 
     /**
     /**

+ 6 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -2,6 +2,7 @@ package org.dbsyncer.connector.kafka;
 
 
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.Connector;
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.util.KafkaUtil;
 import org.dbsyncer.connector.util.KafkaUtil;
@@ -12,7 +13,11 @@ import java.util.Map;
 public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConfig> {
 public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConfig> {
     @Override
     @Override
     public ConnectorMapper connect(KafkaConfig config) {
     public ConnectorMapper connect(KafkaConfig config) {
-        return new KafkaConnectorMapper(config, KafkaUtil.getConnection(config));
+        try {
+            return new KafkaConnectorMapper(config, KafkaUtil.getConnection(config));
+        } catch (Exception e) {
+            throw new ConnectorException("无法连接, 请检查配置:" + e.getMessage());
+        }
     }
     }
 
 
     @Override
     @Override

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -100,7 +100,7 @@ public class ParserFactory implements Parser {
             try {
             try {
                 alive = connectorFactory.refresh(config);
                 alive = connectorFactory.refresh(config);
             } catch (Exception e) {
             } catch (Exception e) {
-                // nothing to do
+                logger.error(e.getMessage());
             }
             }
             if (alive) {
             if (alive) {
                 logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
                 logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());

+ 7 - 6
dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html

@@ -43,12 +43,13 @@
             </select>
             </select>
         </div>
         </div>
     </div>
     </div>
+
+    <script type="text/javascript">
+        $(function () {
+            // 初始化select插件
+            initSelectIndex($(".select-control"), 1);
+        })
+    </script>
 </div>
 </div>
 
 
-<script type="text/javascript">
-$(function () {
-    // 初始化select插件
-    initSelectIndex($(".select-control"), 1);
-})
-</script>
 </html>
 </html>

+ 7 - 6
dbsyncer-web/src/main/resources/public/connector/addElasticsearch.html

@@ -43,12 +43,13 @@
         </div>
         </div>
         <div class="col-sm-6"></div>
         <div class="col-sm-6"></div>
     </div>
     </div>
+
+    <script type="text/javascript">
+        $(function () {
+            // 初始化select插件
+            initSelectIndex($(".select-control"), 1);
+        })
+    </script>
 </div>
 </div>
 
 
-<script type="text/javascript">
-$(function () {
-    // 初始化select插件
-    initSelectIndex($(".select-control"), 1);
-})
-</script>
 </html>
 </html>

+ 33 - 32
dbsyncer-web/src/main/resources/public/connector/addKafka.html

@@ -41,17 +41,17 @@
                         <label class="col-sm-3 control-label">session.timeout.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="在使用Kafka的组管理时,用于检测消费者故障超时"></i></label>
                         <label class="col-sm-3 control-label">session.timeout.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="在使用Kafka的组管理时,用于检测消费者故障超时"></i></label>
                         <div class="col-sm-3">
                         <div class="col-sm-3">
                             <select class="form-control select-control" name="sessionTimeoutMs">
                             <select class="form-control select-control" name="sessionTimeoutMs">
-                                <option value="10000" th:selected="${connector?.config?.sessionTimeoutMs eq '10000'}">10秒</option>
-                                <option value="15000" th:selected="${connector?.config?.sessionTimeoutMs eq '15000'}">15秒</option>
-                                <option value="20000" th:selected="${connector?.config?.sessionTimeoutMs eq '20000'}">20秒</option>
+                                <option value="10000" th:selected="${connector?.config?.sessionTimeoutMs eq 10000}">10秒</option>
+                                <option value="15000" th:selected="${connector?.config?.sessionTimeoutMs eq 15000}">15秒</option>
+                                <option value="20000" th:selected="${connector?.config?.sessionTimeoutMs eq 20000}">20秒</option>
                             </select>
                             </select>
                         </div>
                         </div>
                         <label class="col-sm-3 control-label">max.partition.fetch.bytes <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次接收Kafka服务器订阅消息的最大大小,与max.request.size参数保持一致"></i></label>
                         <label class="col-sm-3 control-label">max.partition.fetch.bytes <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次接收Kafka服务器订阅消息的最大大小,与max.request.size参数保持一致"></i></label>
                         <div class="col-sm-3">
                         <div class="col-sm-3">
                             <select id="maxPartitionFetchBytesSelect" class="form-control select-control" name="maxPartitionFetchBytes">
                             <select id="maxPartitionFetchBytesSelect" class="form-control select-control" name="maxPartitionFetchBytes">
-                                <option value="1048576" th:selected="${connector?.config?.maxPartitionFetchBytes eq '1048576'}">1MB</option>
-                                <option value="2097152" th:selected="${connector?.config?.maxPartitionFetchBytes eq '2097152'}">2MB</option>
-                                <option value="3145728" th:selected="${connector?.config?.maxPartitionFetchBytes eq '3145728'}">3MB</option>
+                                <option value="1048576" th:selected="${connector?.config?.maxPartitionFetchBytes eq 1048576}">1MB</option>
+                                <option value="2097152" th:selected="${connector?.config?.maxPartitionFetchBytes eq 2097152}">2MB</option>
+                                <option value="3145728" th:selected="${connector?.config?.maxPartitionFetchBytes eq 3145728}">3MB</option>
                             </select>
                             </select>
                         </div>
                         </div>
                     </div>
                     </div>
@@ -88,9 +88,9 @@
 所以“buffer.memory”参数需要结合实际业务情况压测,需要测算在生产环境中用户线程会以每秒多少消息的频率来写入内存缓冲。经过压测,调试出来一个合理值。"></i></label>
 所以“buffer.memory”参数需要结合实际业务情况压测,需要测算在生产环境中用户线程会以每秒多少消息的频率来写入内存缓冲。经过压测,调试出来一个合理值。"></i></label>
                         <div class="col-sm-3">
                         <div class="col-sm-3">
                             <select class="form-control select-control" name="bufferMemory">
                             <select class="form-control select-control" name="bufferMemory">
-                                <option value="33554432" th:selected="${connector?.config?.bufferMemory eq '33554432'}">32MB</option>
-                                <option value="67108864" th:selected="${connector?.config?.bufferMemory eq '67108864'}">64MB</option>
-                                <option value="134217728" th:selected="${connector?.config?.bufferMemory eq '134217728'}">128MB</option>
+                                <option value="33554432" th:selected="${connector?.config?.bufferMemory eq 33554432}">32MB</option>
+                                <option value="67108864" th:selected="${connector?.config?.bufferMemory eq 67108864}">64MB</option>
+                                <option value="134217728" th:selected="${connector?.config?.bufferMemory eq 134217728}">128MB</option>
                             </select>
                             </select>
                         </div>
                         </div>
                         <label class="col-sm-3 control-label">batch.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每个Batch要存放batch.size大小的数据后,才可以发送出去.
                         <label class="col-sm-3 control-label">batch.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每个Batch要存放batch.size大小的数据后,才可以发送出去.
@@ -98,9 +98,9 @@
 但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。"></i></label>
 但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。"></i></label>
                         <div class="col-sm-3">
                         <div class="col-sm-3">
                             <select class="form-control select-control" name="batchSize">
                             <select class="form-control select-control" name="batchSize">
-                                <option value="32768" th:selected="${connector?.config?.batchSize eq '32768'}">32768</option>
-                                <option value="65536" th:selected="${connector?.config?.batchSize eq '65536'}">65536</option>
-                                <option value="131072" th:selected="${connector?.config?.batchSize eq '131072'}">131072</option>
+                                <option value="32768" th:selected="${connector?.config?.batchSize eq 32768}">32768</option>
+                                <option value="65536" th:selected="${connector?.config?.batchSize eq 65536}">65536</option>
+                                <option value="131072" th:selected="${connector?.config?.batchSize eq 131072}">131072</option>
                             </select>
                             </select>
                         </div>
                         </div>
                     </div>
                     </div>
@@ -112,9 +112,9 @@
 linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。"></i></label>
 linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。"></i></label>
                         <div class="col-sm-3">
                         <div class="col-sm-3">
                             <select class="form-control select-control" name="lingerMs">
                             <select class="form-control select-control" name="lingerMs">
-                                <option value="10" th:selected="${connector?.config?.lingerMs eq '10'}">10毫秒</option>
-                                <option value="20" th:selected="${connector?.config?.lingerMs eq '15'}">15毫秒</option>
-                                <option value="30" th:selected="${connector?.config?.lingerMs eq '20'}">20毫秒</option>
+                                <option value="10" th:selected="${connector?.config?.lingerMs eq 10}">10毫秒</option>
+                                <option value="20" th:selected="${connector?.config?.lingerMs eq 15}">15毫秒</option>
+                                <option value="30" th:selected="${connector?.config?.lingerMs eq 20}">20毫秒</option>
                             </select>
                             </select>
                         </div>
                         </div>
                         <label class="col-sm-3 control-label">acks <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="0:客户端发送出去为成功;1:写入磁盘为成功;all:Leader保持同步Follower结束为成功"></i></label>
                         <label class="col-sm-3 control-label">acks <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="0:客户端发送出去为成功;1:写入磁盘为成功;all:Leader保持同步Follower结束为成功"></i></label>
@@ -130,17 +130,17 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
                         <label class="col-sm-3 control-label">retries <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="发送失败重试次数"></i></label>
                         <label class="col-sm-3 control-label">retries <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="发送失败重试次数"></i></label>
                         <div class="col-sm-3">
                         <div class="col-sm-3">
                             <select class="form-control select-control" name="retries">
                             <select class="form-control select-control" name="retries">
-                                <option value="1" th:selected="${connector?.config?.retries eq '1'}">1</option>
-                                <option value="2" th:selected="${connector?.config?.retries eq '2'}">2</option>
-                                <option value="3" th:selected="${connector?.config?.retries eq '3'}">3</option>
+                                <option value="1" th:selected="${connector?.config?.retries eq 1}">1</option>
+                                <option value="2" th:selected="${connector?.config?.retries eq 2}">2</option>
+                                <option value="3" th:selected="${connector?.config?.retries eq 3}">3</option>
                             </select>
                             </select>
                         </div>
                         </div>
                         <label class="col-sm-3 control-label">max.request.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次发送给Kafka服务器请求消息的最大大小,与max.partition.fetch.bytes保持一致"></i></label>
                         <label class="col-sm-3 control-label">max.request.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次发送给Kafka服务器请求消息的最大大小,与max.partition.fetch.bytes保持一致"></i></label>
                         <div class="col-sm-3">
                         <div class="col-sm-3">
                             <select id="maxRequestSizeSelect" class="form-control select-control" name="maxRequestSize">
                             <select id="maxRequestSizeSelect" class="form-control select-control" name="maxRequestSize">
-                                <option value="1048576" th:selected="${connector?.config?.maxRequestSize eq '1048576'}">1MB</option>
-                                <option value="2097152" th:selected="${connector?.config?.maxRequestSize eq '2097152'}">2MB</option>
-                                <option value="3145728" th:selected="${connector?.config?.maxRequestSize eq '3145728'}">3MB</option>
+                                <option value="1048576" th:selected="${connector?.config?.maxRequestSize eq 1048576}">1MB</option>
+                                <option value="2097152" th:selected="${connector?.config?.maxRequestSize eq 2097152}">2MB</option>
+                                <option value="3145728" th:selected="${connector?.config?.maxRequestSize eq 3145728}">3MB</option>
                             </select>
                             </select>
                         </div>
                         </div>
                     </div>
                     </div>
@@ -149,17 +149,18 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
             </div>
             </div>
         </div>
         </div>
     </div>
     </div>
-</div>
 
 
-<script type="text/javascript">
-    $(function () {
-        // 初始化select插件
-        initSelectIndex($(".select-control"), 1);
+    <script type="text/javascript">
+        $(function () {
+            // 初始化select插件
+            initSelectIndex($(".select-control"), 1);
+
+            // 绑定下拉联动切换事件
+            $("#maxPartitionFetchBytesSelect").on('changed.bs.select',function(e){
+                $("#maxRequestSizeSelect").selectpicker('val', $(this).selectpicker('val'));
+            });
+        })
+    </script>
+</div>
 
 
-        // 绑定下拉联动切换事件
-        $("#maxPartitionFetchBytesSelect").on('changed.bs.select',function(e){
-            $("#maxRequestSizeSelect").selectpicker('val', $(this).selectpicker('val'));
-        });
-    })
-</script>
 </html>
 </html>

+ 7 - 6
dbsyncer-web/src/main/resources/public/connector/addMysql.html

@@ -28,12 +28,13 @@
             </select>
             </select>
         </div>
         </div>
     </div>
     </div>
+
+    <script type="text/javascript">
+        $(function () {
+            // 初始化select插件
+            initSelectIndex($(".select-control"), 1);
+        })
+    </script>
 </div>
 </div>
 
 
-<script type="text/javascript">
-$(function () {
-    // 初始化select插件
-    initSelectIndex($(".select-control"), 1);
-})
-</script>
 </html>
 </html>