1
0
AE86 3 жил өмнө
parent
commit
8ab817c695

+ 16 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -9,6 +9,7 @@ import oracle.jdbc.dcn.*;
 import oracle.jdbc.driver.OracleConnection;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.oracle.event.DCNEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +83,7 @@ public class DBChangeNotification {
             dcr.addListener(new DCNListener());
 
             final long regId = dcr.getRegId();
-            final String host = getHost();
+            final String host = getHost(dcr);
             final int port = getPort(dcr);
             final String callback = String.format(CALLBACK, host, port);
             logger.info("regId:{}, callback:{}", regId, callback);
@@ -212,23 +213,12 @@ public class DBChangeNotification {
         return apply;
     }
 
-    private String getHost() {
-        if (url != null) {
-            String host = url.substring(url.indexOf("@") + 1);
-            host = host.substring(0, host.indexOf(":"));
-            return host;
-        }
-        return "127.0.0.1";
-    }
-
-    private int getPort(DatabaseChangeRegistration dcr) {
-        Object obj = null;
+    private Object invokeDCR(DatabaseChangeRegistration dcr, String declaredMethod) {
         try {
-            // 反射获取抽象属性 NTFRegistration
             Class clazz = dcr.getClass().getSuperclass();
-            Method method = clazz.getDeclaredMethod("getClientTCPPort");
+            Method method = clazz.getDeclaredMethod(declaredMethod);
             method.setAccessible(true);
-            obj = method.invoke(dcr, new Object[]{});
+            return method.invoke(dcr, new Object[]{});
         } catch (NoSuchMethodException e) {
             logger.error(e.getMessage());
         } catch (IllegalAccessException e) {
@@ -236,7 +226,17 @@ public class DBChangeNotification {
         } catch (InvocationTargetException e) {
             logger.error(e.getMessage());
         }
-        return null == obj ? 0 : Integer.parseInt(String.valueOf(obj));
+        throw new ListenerException(String.format("Can't invoke '%s'.", declaredMethod));
+    }
+
+    private String getHost(DatabaseChangeRegistration dcr) {
+        Object obj = invokeDCR(dcr, "getClientHost");
+        return String.valueOf(obj);
+    }
+
+    private int getPort(DatabaseChangeRegistration dcr) {
+        Object obj = invokeDCR(dcr, "getClientTCPPort");
+        return Integer.parseInt(String.valueOf(obj));
     }
 
     private void clean(OracleStatement statement, long excludeRegId, String excludeCallback) {

+ 22 - 22
dbsyncer-web/src/main/resources/public/connector/addKafka.html

@@ -36,12 +36,12 @@
                 </div>
                 <div class="panel-body">
                     <div class="form-group">
-                        <label class="col-sm-2 control-label">group.id <strong class="driverVerifcateRequired">*</strong></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">group.id <strong class="driverVerifcateRequired">*</strong></label>
+                        <div class="col-sm-3">
                             <input class="form-control" name="groupId" type="text" maxlength="32" dbsyncer-valid="require" th:value="${connector?.config?.groupId}"/>
                         </div>
-                        <label class="col-sm-2 control-label">session.timeout.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="在使用Kafka的组管理时,用于检测消费者故障超时"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">session.timeout.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="在使用Kafka的组管理时,用于检测消费者故障超时"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="autoCommitIntervalMs">
                                 <option value="10000" th:selected="${connector?.config?.autoCommitIntervalMs eq '10000'}">10秒</option>
                                 <option value="15000" th:selected="${connector?.config?.autoCommitIntervalMs eq '15000'}">15秒</option>
@@ -50,15 +50,15 @@
                         </div>
                     </div>
                     <div class="form-group">
-                        <label class="col-sm-2 control-label">enable.auto.commit <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果为true,消费者的偏移量将在后台定期提交"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">enable.auto.commit <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果为true,消费者的偏移量将在后台定期提交"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="enableAutoCommit">
                                 <option value="false" th:selected="${connector?.config?.enableAutoCommit eq 'true'}">false</option>
                                 <option value="true" th:selected="${connector?.config?.enableAutoCommit eq 'false'}">true</option>
                             </select>
                         </div>
-                        <label class="col-sm-2 control-label">auto.commit.interval.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="enable.auto.commit为true才生效"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">auto.commit.interval.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="enable.auto.commit为true才生效"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="autoCommitIntervalMs">
                                 <option value="1000" th:selected="${connector?.config?.autoCommitIntervalMs eq '1000'}">1秒</option>
                                 <option value="1500" th:selected="${connector?.config?.autoCommitIntervalMs eq '1500'}">1.5秒</option>
@@ -82,12 +82,12 @@
                 </div>
                 <div class="panel-body">
                     <div class="form-group">
-                        <label class="col-sm-2 control-label">topic <strong class="driverVerifcateRequired">*</strong></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">topic <strong class="driverVerifcateRequired">*</strong></label>
+                        <div class="col-sm-3">
                             <input class="form-control" name="topic" type="text" maxlength="32" dbsyncer-valid="require" th:value="${connector?.config?.topic}"/>
                         </div>
-                        <label class="col-sm-2 control-label">acks <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="0:客户端发送出去为成功;1:写入磁盘为成功;all:Leader保持同步Follower结束为成功"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">acks <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="0:客户端发送出去为成功;1:写入磁盘为成功;all:Leader保持同步Follower结束为成功"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="acks">
                                 <option value="1" th:selected="${connector?.config?.acks eq '1'}">1</option>
                                 <option value="0" th:selected="${connector?.config?.acks eq '0'}">0</option>
@@ -96,16 +96,16 @@
                         </div>
                     </div>
                     <div class="form-group">
-                        <label class="col-sm-2 control-label">buffer.memory <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="批处理缓冲区"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">buffer.memory <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="批处理缓冲区"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="bufferMemory">
                                 <option value="33554432" th:selected="${connector?.config?.bufferMemory eq '33554432'}">32MB</option>
                                 <option value="67108864" th:selected="${connector?.config?.bufferMemory eq '67108864'}">64MB</option>
                                 <option value="134217728" th:selected="${connector?.config?.bufferMemory eq '134217728'}">128MB</option>
                             </select>
                         </div>
-                        <label class="col-sm-2 control-label">retries <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="发送失败重试次数"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">retries <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="发送失败重试次数"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="retries">
                                 <option value="1" th:selected="${connector?.config?.retries eq '1'}">1</option>
                                 <option value="2" th:selected="${connector?.config?.retries eq '2'}">2</option>
@@ -114,16 +114,16 @@
                         </div>
                     </div>
                     <div class="form-group">
-                        <label class="col-sm-2 control-label">batch.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">batch.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="batchSize">
                                 <option value="32768" th:selected="${connector?.config?.batchSize eq '32768'}">32768</option>
                                 <option value="65536" th:selected="${connector?.config?.batchSize eq '65536'}">65536</option>
                                 <option value="131072" th:selected="${connector?.config?.batchSize eq '131072'}">131072</option>
                             </select>
                         </div>
-                        <label class="col-sm-2 control-label">linger.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="同时设置batch.size和 linger.ms,就是哪个条件先满足就都会将消息发送出去"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">linger.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="同时设置batch.size和 linger.ms,就是哪个条件先满足就都会将消息发送出去"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="retries">
                                 <option value="10000" th:selected="${connector?.config?.lingerMms eq '10000'}">10秒</option>
                                 <option value="15000" th:selected="${connector?.config?.lingerMms eq '15000'}">15秒</option>
@@ -132,8 +132,8 @@
                         </div>
                     </div>
                     <div class="form-group">
-                        <label class="col-sm-2 control-label">max.block.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。"></i></label>
-                        <div class="col-sm-4">
+                        <label class="col-sm-3 control-label">max.block.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。"></i></label>
+                        <div class="col-sm-3">
                             <select class="form-control select-control" name="maxBlockMs">
                                 <option value="10000" th:selected="${connector?.config?.maxBlockMs eq '10000'}">10秒</option>
                                 <option value="30000" th:selected="${connector?.config?.maxBlockMs eq '30000'}">30秒</option>