Ver Fonte

add mysql binlog client

AE86 há 5 anos atrás
pai
commit
5ad3dc29ed

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DatabaseConfig.java

@@ -11,7 +11,7 @@ public class DatabaseConfig extends ConnectorConfig {
     // 驱动com.mysql.jdbc.Driver
     private String driverClassName;
 
-    // 连接地址jdbc:mysql://127.0.0.1:3306/test?seUnicode=true&characterEncoding=UTF8&useSSL=true
+    // 连接地址
     private String url;
 
     // 帐号

+ 4 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/DefaultExtractor.java

@@ -16,9 +16,10 @@ import java.util.concurrent.CopyOnWriteArrayList;
  */
 public abstract class DefaultExtractor implements Extractor {
 
-    private ConnectorConfig connectorConfig;
-    private ListenerConfig listenerConfig;
-    private Map<String, String> map;
+    protected ConnectorConfig connectorConfig;
+    protected ListenerConfig listenerConfig;
+    protected Map<String, String> map;
+
     private List<Event> watcher;
     private Action action;
 

+ 8 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -7,5 +7,13 @@ import java.util.Map;
 
 public interface Listener {
 
+    /**
+     * 创建抽取器
+     *
+     * @param connectorConfig 连接器配置
+     * @param listenerConfig  监听器配置
+     * @param map             增量参数
+     * @return
+     */
     DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map);
 }

+ 35 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -1,11 +1,13 @@
 package org.dbsyncer.listener.extractor;
 
+import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.listener.DefaultExtractor;
+import org.dbsyncer.listener.mysql.binlog.BinlogEventListener;
+import org.dbsyncer.listener.mysql.binlog.BinlogEventV4;
+import org.dbsyncer.listener.mysql.binlog.BinlogRemoteClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * @version 1.0.0
  * @Author AE86
@@ -15,22 +17,22 @@ public class MysqlExtractor extends DefaultExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    boolean running;
+    private BinlogRemoteClient client;
 
     @Override
     public void extract() {
-        running = true;
-        for (int i = 0; i < 20; i++) {
-            if(!running){
-                logger.info("中止监听任务");
-                break;
-            }
-            logger.info("模拟监听任务");
-            try {
-                TimeUnit.SECONDS.sleep(3);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+        try {
+            init();
+
+            client.setBinlogEventListener(new BinlogEventListener() {
+
+                @Override
+                public void onEvents(BinlogEventV4 event) {
+                }
+            });
+            client.start();
+        } catch (Exception e) {
+            logger.error("启动失败:{}", e.getMessage());
         }
     }
 
@@ -41,6 +43,23 @@ public class MysqlExtractor extends DefaultExtractor {
 
     @Override
     public void close() {
-        running = false;
+        try {
+            client.stopQuietly();
+        } catch (Exception e) {
+            logger.error("关闭失败:{}", e.getMessage());
+        }
+    }
+
+    private void init() {
+        final DatabaseConfig config = (DatabaseConfig) super.connectorConfig;
+        // TODO 支持解析集群地址 jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;useSSL=true
+        String url = config.getUrl();
+        String username = config.getUsername();
+        String password = config.getPassword();
+        String threadSuffixName = "";
+
+        client = new BinlogRemoteClient("127.0.0.1", 3306, username, password, threadSuffixName);
+        client.setBinlogFileName("mysql-bin.000001");
+        client.setBinlogPosition(4);
     }
 }