Explorar o código

add mysql binlog

AE86 %!s(int64=5) %!d(string=hai) anos
pai
achega
8df14de24d

+ 11 - 5
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.common.event;
 
-import java.util.Map;
+import java.util.List;
 
 /**
  * @version 1.0.0
@@ -12,10 +12,16 @@ public interface Event {
     /**
      * 数据变更事件
      *
-     * @param event  事件
-     * @param before 变化前
-     * @param after  变化后
+     * @param tableName 表名
+     * @param event     事件
+     * @param before    变化前
+     * @param after     变化后
      */
-    void changedEvent(String event, Map<String, Object> before, Map<String, Object> after);
+    void changedEvent(String tableName, String event, List<Object> before, List<Object> after);
+
+    /**
+     * 写入增量点事件
+     */
+    void flushEvent();
 
 }

+ 8 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/DefaultExtractor.java

@@ -36,9 +36,15 @@ public abstract class DefaultExtractor implements Extractor {
         }
     }
 
-    public void changedEvent(String event, Map<String, Object> before, Map<String, Object> after) {
+    public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedEvent(event, before, after));
+            watcher.forEach(w -> w.changedEvent(tableName, event, before, after));
+        }
+    }
+
+    public void flushEvent() {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.flushEvent());
         }
     }
 

+ 2 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -19,6 +19,7 @@ public class ListenerFactory implements Listener {
         extractor.setAction(ListenerTypeEnum.getAction(listenerConfig.getListenerType()));
         extractor.setConnectorConfig(config);
         extractor.setListenerConfig(listenerConfig);
+        extractor.setMap(map);
         return extractor;
     }
-}
+}

+ 19 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/Host.java

@@ -0,0 +1,19 @@
+package org.dbsyncer.listener.config;
+
+public class Host {
+    private String ip;
+    private int    port;
+
+    public Host(String ip, int port) {
+        this.ip = ip;
+        this.port = port;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public int getPort() {
+        return port;
+    }
+}

+ 150 - 17
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -1,12 +1,27 @@
 package org.dbsyncer.listener.extractor;
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.DefaultExtractor;
+import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.config.Host;
 import org.dbsyncer.listener.mysql.binlog.BinlogEventListener;
 import org.dbsyncer.listener.mysql.binlog.BinlogEventV4;
 import org.dbsyncer.listener.mysql.binlog.BinlogRemoteClient;
+import org.dbsyncer.listener.mysql.binlog.impl.event.*;
+import org.dbsyncer.listener.mysql.common.glossary.Column;
+import org.dbsyncer.listener.mysql.common.glossary.Pair;
+import org.dbsyncer.listener.mysql.common.glossary.Row;
+import org.dbsyncer.listener.mysql.common.glossary.column.StringColumn;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.util.*;
+import java.util.regex.Matcher;
+
+import static java.util.regex.Pattern.compile;
 
 /**
  * @version 1.0.0
@@ -17,22 +32,33 @@ public class MysqlExtractor extends DefaultExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private static final String BINLOG_FILENAME = "fileName";
+    private static final String BINLOG_POSITION = "position";
     private BinlogRemoteClient client;
+    private List<Host> cluster;
+    private int master = 0;
 
     @Override
     public void extract() {
         try {
-            init();
+            final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+            cluster = readNodes(config.getUrl());
+            Assert.notEmpty(cluster, "Mysql连接地址有误.");
 
-            client.setBinlogEventListener(new BinlogEventListener() {
+            final Host host = cluster.get(master);
+            final String username = config.getUsername();
+            final String password = config.getPassword();
+            final String threadSuffixName = "mysql-binlog";
 
-                @Override
-                public void onEvents(BinlogEventV4 event) {
-                }
-            });
+            client = new BinlogRemoteClient(host.getIp(), host.getPort(), username, password, threadSuffixName);
+            client.setBinlogFileName(map.get(BINLOG_FILENAME));
+            String pos = map.get(BINLOG_POSITION);
+            client.setBinlogPosition(StringUtils.isBlank(pos) ? 0 : Long.parseLong(pos));
+            client.setBinlogEventListener(new MysqlEventListener());
             client.start();
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
+            throw new ListenerException(e);
         }
     }
 
@@ -50,16 +76,123 @@ public class MysqlExtractor extends DefaultExtractor {
         }
     }
 
-    private void init() {
-        final DatabaseConfig config = (DatabaseConfig) super.connectorConfig;
-        // TODO 支持解析集群地址 jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;useSSL=true
-        String url = config.getUrl();
-        String username = config.getUsername();
-        String password = config.getPassword();
-        String threadSuffixName = "";
-
-        client = new BinlogRemoteClient("127.0.0.1", 3306, username, password, threadSuffixName);
-        client.setBinlogFileName("mysql-bin.000001");
-        client.setBinlogPosition(4);
+    private List<Host> readNodes(String url) {
+        if (StringUtils.isBlank(url)) {
+            return Collections.EMPTY_LIST;
+        }
+        Matcher matcher = compile("(//)(?!(/)).+?(/)").matcher(url);
+        while (matcher.find()) {
+            url = matcher.group(0);
+            break;
+        }
+        url = StringUtils.replace(url, "/", "");
+
+        List<Host> cluster = new ArrayList<>();
+        String[] arr = StringUtils.split(url, ",");
+        int size = arr.length;
+        for (int i = 0; i < size; i++) {
+            String[] host = StringUtils.split(arr[i], ":");
+            if (2 == host.length) {
+                cluster.add(new Host(host[0], Integer.parseInt(host[1])));
+            }
+        }
+        return cluster;
     }
+
+    /**
+     * 有变化触发刷新binlog增量事件
+     *
+     * @param event
+     */
+    private void refresh(AbstractBinlogEventV4 event) {
+        String binlogFilename = event.getBinlogFilename();
+        long nextPosition = event.getHeader().getNextPosition();
+        if (!StringUtils.equals(binlogFilename, client.getBinlogFileName()) || 0 != Long.compare(nextPosition,
+                client.getBinlogPosition())) {
+            client.setBinlogFileName(binlogFilename);
+            client.setBinlogPosition(nextPosition);
+            map.put(BINLOG_FILENAME, client.getBinlogFileName());
+            map.put(BINLOG_POSITION, String.valueOf(nextPosition));
+            flushEvent();
+        }
+    }
+
+    final class MysqlEventListener implements BinlogEventListener {
+
+        private Map<Long, String> table = new HashMap<>();
+
+        @Override
+        public void onEvents(BinlogEventV4 event) {
+            if (event == null) {
+                logger.error("binlog event is null");
+                return;
+            }
+
+            if (event instanceof TableMapEvent) {
+                TableMapEvent tableEvent = (TableMapEvent) event;
+                table.putIfAbsent(tableEvent.getTableId(), tableEvent.getTableName().toString());
+                return;
+            }
+
+            if (event instanceof UpdateRowsEventV2) {
+                UpdateRowsEventV2 e = (UpdateRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Pair<Row>> rows = e.getRows();
+                for (Pair<Row> p : rows) {
+                    List<Object> before = new ArrayList<>();
+                    List<Object> after = new ArrayList<>();
+                    addAll(before, p.getBefore().getColumns());
+                    addAll(after, p.getAfter().getColumns());
+                    changedEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after);
+                    break;
+                }
+                return;
+            }
+
+            if (event instanceof WriteRowsEventV2) {
+                WriteRowsEventV2 e = (WriteRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Row> rows = e.getRows();
+                for (Row row : rows) {
+                    List<Object> after = new ArrayList<>();
+                    addAll(after, row.getColumns());
+                    changedEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after);
+                    break;
+                }
+                return;
+            }
+
+            if (event instanceof DeleteRowsEventV2) {
+                DeleteRowsEventV2 e = (DeleteRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Row> rows = e.getRows();
+                for (Row row : rows) {
+                    List<Object> before = new ArrayList<>();
+                    addAll(before, row.getColumns());
+                    changedEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST);
+                    break;
+                }
+                return;
+            }
+
+            // 处理事件优先级:RotateEvent > FormatDescriptionEvent > TableMapEvent > RowsEvent > XidEvent
+            if (event instanceof XidEvent) {
+                refresh((XidEvent) event);
+                return;
+            }
+
+            // 切换binlog
+            if (event instanceof RotateEvent) {
+                refresh((RotateEvent) event);
+                return;
+            }
+
+        }
+
+        private void addAll(List<Object> before, List<Column> columns) {
+            columns.forEach(c -> before.add((c instanceof StringColumn) ? c.toString() : c.getValue()));
+        }
+
+    }
+
 }

+ 8 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -68,9 +68,8 @@ public class IncrementPuller extends AbstractPuller {
             logger.info("启动成功:{}", metaId);
             map.get(metaId).run();
         } catch (Exception e) {
-            logger.error("任务:{} 运行异常:{}", metaId, e.getMessage());
-        } finally {
             finished(metaId);
+            logger.error("运行异常,结束任务{}:{}", metaId, e.getMessage());
         }
     }
 
@@ -112,9 +111,15 @@ public class IncrementPuller extends AbstractPuller {
         }
 
         @Override
-        public void changedEvent(String event, Map<String, Object> before, Map<String, Object> after) {
+        public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
+            logger.info("监听数据>tableName:{},event:{},after:{}, after:{}", tableName, event, before, after);
             // 处理过程有异常向上抛
             list.forEach(tableGroup -> parser.execute(mapping, tableGroup));
+        }
+
+        @Override
+        public void flushEvent() {
+            logger.info("flushEvent");
             flush(mapping.getMetaId());
         }