Browse Source

实现目标源连接器kafka & 升级版本

AE86 3 years ago
parent
commit
eb3556a27b

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
 		<artifactId>dbsyncer</artifactId>
 		<groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
 	</parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
 		<artifactId>dbsyncer</artifactId>
 		<groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
 	</parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 0 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -193,7 +193,6 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             }
         } catch (Exception e) {
             // 记录错误数据
-            result.getFailData().addAll(data);
             result.getFail().set(data.size());
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());

+ 20 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaClient.java

@@ -11,8 +11,10 @@ import org.dbsyncer.connector.ConnectorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.util.List;
 import java.util.Map;
 
@@ -33,15 +35,11 @@ public class KafkaClient {
     }
 
     public boolean ping() {
-        return ping(consumer);
-    }
-
-    private boolean ping(Object client) {
         if (null == networkClient) {
             synchronized (this) {
                 if (null == networkClient) {
                     try {
-                        networkClient = (NetworkClient) invoke(invoke(client, "client"), "client");
+                        networkClient = (NetworkClient) invoke(invoke(consumer, "client"), "client");
                     } catch (NoSuchFieldException e) {
                         logger.error(e.getMessage());
                     } catch (IllegalAccessException e) {
@@ -51,13 +49,26 @@ public class KafkaClient {
             }
         }
         final Node node = networkClient.leastLoadedNode(0);
-        InetSocketAddress address = new InetSocketAddress(node.host(), node.port());
-        if (address.isUnresolved()) {
-            throw new ConnectorException(String.format("DNS resolution failed for url in %s %s:%s", CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, node.host(), node.port()));
-        }
+        telnet(node.host(), node.port(), 5000);
         return true;
     }
 
+    private boolean telnet(String host, int port, int timeout) {
+        Socket socket = new Socket();
+        try {
+            socket.connect(new InetSocketAddress(host, port), timeout);
+            return socket.isConnected();
+        } catch (IOException e) {
+            throw new ConnectorException(String.format("DNS resolution failed for url in %s %s:%s", CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host, port));
+        } finally {
+            try {
+                socket.close();
+            } catch (IOException e) {
+                // nothing to do
+            }
+        }
+    }
+
     private Object invoke(Object obj, String declaredFieldName) throws NoSuchFieldException, IllegalAccessException {
         final Field field = obj.getClass().getDeclaredField(declaredFieldName);
         field.setAccessible(true);

+ 38 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.connector.kafka;
 
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
@@ -15,7 +17,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConfig> {
+public class KafkaConnector extends AbstractConnector implements Connector<KafkaConnectorMapper, KafkaConfig> {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -64,17 +66,49 @@ public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConf
 
     @Override
     public Result reader(KafkaConnectorMapper connectorMapper, ReaderConfig config) {
-        return null;
+        throw new ConnectorException("Full synchronization is not supported");
     }
 
     @Override
     public Result writer(KafkaConnectorMapper connectorMapper, WriterBatchConfig config) {
-        return null;
+        List<Map> data = config.getData();
+        if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
+            logger.error("writer data can not be empty.");
+            throw new ConnectorException("writer data can not be empty.");
+        }
+
+        Result result = new Result();
+        final KafkaConfig cfg = connectorMapper.getConfig();
+        Field pkField = getPrimaryKeyField(config.getFields());
+        try {
+            String topic = cfg.getTopic();
+            String pk = pkField.getName();
+            data.forEach(row -> connectorMapper.getConnection().send(topic, String.valueOf(row.get(pk)), row));
+        } catch (Exception e) {
+            // 记录错误数据
+            result.getFail().set(data.size());
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
+            logger.error(e.getMessage());
+        }
+        return result;
     }
 
     @Override
     public Result writer(KafkaConnectorMapper connectorMapper, WriterSingleConfig config) {
-        return null;
+        Map<String, Object> data = config.getData();
+        Result result = new Result();
+        final KafkaConfig cfg = connectorMapper.getConfig();
+        Field pkField = getPrimaryKeyField(config.getFields());
+        try {
+            connectorMapper.getConnection().send(cfg.getTopic(), String.valueOf(data.get(pkField.getName())), data);
+        } catch (Exception e) {
+            // 记录错误数据
+            result.getFailData().add(data);
+            result.getFail().set(data.size());
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
+            logger.error(e.getMessage());
+        }
+        return result;
     }
 
     @Override

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -3,6 +3,7 @@ package org.dbsyncer.listener.enums;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.kafka.KafkaExtractor;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
 import org.dbsyncer.listener.oracle.OracleExtractor;
 import org.dbsyncer.listener.quartz.DatabaseQuartzExtractor;
@@ -30,6 +31,10 @@ public enum ListenerEnum {
      * log_SqlServer
      */
     LOG_SQL_SERVER(ListenerTypeEnum.LOG.getType() + ConnectorEnum.SQL_SERVER.getType(), SqlServerExtractor.class),
+    /**
+     * log_Kafka
+     */
+    LOG_KAFKA(ListenerTypeEnum.LOG.getType() + ConnectorEnum.KAFKA.getType(), KafkaExtractor.class),
     /**
      * timing_Mysql
      */

+ 22 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/kafka/KafkaExtractor.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.listener.kafka;
+
+import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.ListenerException;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/12/18 22:19
+ */
+public class KafkaExtractor extends AbstractExtractor {
+
+    @Override
+    public void start() {
+        throw new ListenerException("抱歉,kafka监听功能正在开发中...");
+    }
+
+    @Override
+    public void close() {
+
+    }
+}

+ 8 - 8
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -65,17 +65,17 @@ public class KafkaClientTest {
         client.subscribe(Arrays.asList(config.getTopic()));
 
         // 模拟生产者
-        for (int i = 0; i < 5; i++) {
-            Map<String, Object> map = new HashMap<>();
-            map.put("id", i);
-            map.put("name", "张三" + i);
-            map.put("update_time", new Timestamp(System.currentTimeMillis()));
-            client.send(config.getTopic(), map.get("id").toString(), map);
-        }
+//        for (int i = 0; i < 5; i++) {
+//            Map<String, Object> map = new HashMap<>();
+//            map.put("id", i);
+//            map.put("name", "张三" + i);
+//            map.put("update_time", new Timestamp(System.currentTimeMillis()));
+//            client.send(config.getTopic(), map.get("id").toString(), map);
+//        }
 
         new Consumer().start();
         new Heartbeat().start();
-        TimeUnit.SECONDS.sleep(600);
+        TimeUnit.SECONDS.sleep(6000);
         logger.info("test end");
     }
 

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.4-Beta</version>
+		<version>1.1.5-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -25,7 +25,7 @@ management.endpoints.web.exposure.include=*
 management.endpoint.health.show-details=always
 management.health.elasticsearch.enabled=false
 info.app.name=DBSyncer
-info.app.version=1.1.4-Beta
+info.app.version=1.1.5-Beta
 info.app.copyright=&copy;2021 ${info.app.name}(${info.app.version})<footer>Designed By <a href='https://gitee.com/ghi/dbsyncer' target='_blank' >AE86</a></footer>
 
 #All < Trace < Debug < Info < Warn < Error < Fatal < OFF

+ 1 - 5
dbsyncer-web/src/main/resources/public/connector/addKafka.html

@@ -5,17 +5,13 @@
 <div th:fragment="content">
     <div class="form-group">
         <label class="col-sm-2 control-label">地址<i class="fa fa-question-circle fa_gray" aria-hidden="true" title="多个使用英文逗号,例如:192.168.1.100:9092,192.168.1.200:9092"></i> <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-10">
+        <div class="col-sm-4">
             <textarea name="bootstrapServers" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="2" th:text="${connector?.config?.bootstrapServers}?:'127.0.0.1:9092'"></textarea>
         </div>
-    </div>
-
-    <div class="form-group">
         <label class="col-sm-2 control-label">Topic <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-4">
             <input class="form-control" name="topic" type="text" maxlength="64" dbsyncer-valid="require" th:value="${connector?.config?.topic}"/>
         </div>
-        <div class="col-sm-6"></div>
     </div>
 
     <div class="form-group">

+ 1 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-	<version>1.1.4-Beta</version>
+	<version>1.1.5-Beta</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>

+ 1 - 1
version.cmd

@@ -1,6 +1,6 @@
 @echo off
 
-set APP_VERSION=1.1.4-Beta
+set APP_VERSION=1.1.5-Beta
 
 echo "Clean Project ..."
 call mvn clean -f pom.xml