1
0
穿云 4 сар өмнө
parent
commit
59b8206d1a
28 өөрчлөгдсөн 199 нэмэгдсэн , 377 устгасан
  1. 8 3
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  2. 6 25
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MetricReporter.java
  3. 1 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java
  4. 1 1
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java
  5. 16 23
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java
  6. 13 37
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/ParserConsumer.java
  7. 0 87
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/LogConsumer.java
  8. 0 42
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/QuartzConsumer.java
  9. 5 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/event/RefreshOffsetEvent.java
  10. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  11. 7 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java
  12. 19 11
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  13. 6 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java
  14. 1 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ProfileComponentImpl.java
  15. 16 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/AbstractWriter.java
  16. 13 10
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java
  17. 7 14
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java
  18. 3 15
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java
  19. 3 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/ChangedEventTypeEnum.java
  20. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractDatabaseListener.java
  21. 10 10
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractQuartzListener.java
  22. 17 9
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/ChangedEvent.java
  23. 0 28
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/CommonChangedEvent.java
  24. 8 12
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/DDLChangedEvent.java
  25. 8 7
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/RowChangedEvent.java
  26. 10 10
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/ScanChangedEvent.java
  27. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/ChangedOffset.java
  28. 18 4
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/TableGroupQuartzCommand.java

+ 8 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -163,12 +163,17 @@ public class DataSyncServiceImpl implements DataSyncService {
         }
         }
         TableGroup tableGroup = profileComponent.getTableGroup(tableGroupId);
         TableGroup tableGroup = profileComponent.getTableGroup(tableGroupId);
         String sourceTableName = tableGroup.getSourceTable().getName();
         String sourceTableName = tableGroup.getSourceTable().getName();
-        RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, Collections.EMPTY_LIST);
+
         // 转换为源字段
         // 转换为源字段
         final Picker picker = new Picker(tableGroup.getFieldMapping());
         final Picker picker = new Picker(tableGroup.getFieldMapping());
-        changedEvent.setChangedRow(picker.pickSourceData(binlogData));
+        Map map = picker.pickSourceData(binlogData);
+        List<Field> sourceFields = picker.getSourceFields();
+        List<Object> changedRow = new ArrayList<>(sourceFields.size());
+        sourceFields.forEach(field -> changedRow.add(map.get(field.getName())));
+        RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, changedRow);
+
         // 执行同步是否成功
         // 执行同步是否成功
-        bufferActuatorRouter.execute(metaId, tableGroupId, changedEvent);
+        bufferActuatorRouter.execute(metaId, changedEvent);
         storageService.remove(StorageEnum.DATA, metaId, messageId);
         storageService.remove(StorageEnum.DATA, metaId, messageId);
         // 更新失败数
         // 更新失败数
         Meta meta = profileComponent.getMeta(metaId);
         Meta meta = profileComponent.getMeta(metaId);

+ 6 - 25
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MetricReporter.java

@@ -6,11 +6,7 @@ package org.dbsyncer.biz.impl;
 import org.dbsyncer.biz.enums.BufferActuatorMetricEnum;
 import org.dbsyncer.biz.enums.BufferActuatorMetricEnum;
 import org.dbsyncer.biz.enums.StatisticEnum;
 import org.dbsyncer.biz.enums.StatisticEnum;
 import org.dbsyncer.biz.enums.ThreadPoolMetricEnum;
 import org.dbsyncer.biz.enums.ThreadPoolMetricEnum;
-import org.dbsyncer.biz.model.AppReportMetric;
-import org.dbsyncer.biz.model.MappingReportMetric;
-import org.dbsyncer.biz.model.MetricResponse;
-import org.dbsyncer.biz.model.MetricResponseInfo;
-import org.dbsyncer.biz.model.Sample;
+import org.dbsyncer.biz.model.*;
 import org.dbsyncer.biz.vo.HistoryStackVo;
 import org.dbsyncer.biz.vo.HistoryStackVo;
 import org.dbsyncer.common.metric.Bucket;
 import org.dbsyncer.common.metric.Bucket;
 import org.dbsyncer.common.metric.TimeRegistry;
 import org.dbsyncer.common.metric.TimeRegistry;
@@ -23,10 +19,8 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
 import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
-import org.dbsyncer.parser.flush.impl.TableGroupBufferActuator;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
-import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.enums.StorageEnum;
 import org.dbsyncer.sdk.enums.StorageEnum;
@@ -44,12 +38,7 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
@@ -114,24 +103,16 @@ public class MetricReporter implements ScheduledTaskJob {
             bufferActuatorRouter.getRouter().forEach((metaId, group) -> {
             bufferActuatorRouter.getRouter().forEach((metaId, group) -> {
                 Meta meta = profileComponent.getMeta(metaId);
                 Meta meta = profileComponent.getMeta(metaId);
                 Mapping mapping = profileComponent.getMapping(meta.getMappingId());
                 Mapping mapping = profileComponent.getMapping(meta.getMappingId());
-                group.forEach((k, bufferActuator) -> {
-                    if (bufferActuator instanceof TableGroupBufferActuator) {
-                        TableGroupBufferActuator actuator = bufferActuator;
-                        TableGroup tableGroup = profileComponent.getTableGroup(actuator.getTableGroupId());
-                        String metricName = new StringBuilder()
-                                .append(tableGroup.getSourceTable().getName())
-                                .append(" > ")
-                                .append(tableGroup.getTargetTable().getName()).toString();
-                        tableList.add(collect(bufferActuator, tableGroupCode, mapping.getName(), metricName));
-                    }
-                });
+                group.forEach((k, bufferActuator) ->
+                    tableList.add(collect(bufferActuator, tableGroupCode, mapping.getName(), bufferActuator.getTableName()))
+                );
             });
             });
             List<MetricResponseInfo> sortList = tableList.stream()
             List<MetricResponseInfo> sortList = tableList.stream()
                     .sorted(Comparator.comparing(MetricResponseInfo::getQueueUp).reversed())
                     .sorted(Comparator.comparing(MetricResponseInfo::getQueueUp).reversed())
                     .collect(Collectors.toList());
                     .collect(Collectors.toList());
             list.addAll(sortList.size() <= SHOW_BUFFER_ACTUATOR_SIZE ? sortList : sortList.subList(0, SHOW_BUFFER_ACTUATOR_SIZE));
             list.addAll(sortList.size() <= SHOW_BUFFER_ACTUATOR_SIZE ? sortList : sortList.subList(0, SHOW_BUFFER_ACTUATOR_SIZE));
         }
         }
-        return list.stream().map(info -> info.getResponse()).collect(Collectors.toList());
+        return list.stream().map(MetricResponseInfo::getResponse).collect(Collectors.toList());
     }
     }
 
 
     public AppReportMetric getAppReportMetric() {
     public AppReportMetric getAppReportMetric() {

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java

@@ -349,7 +349,7 @@ public class MySQLListener extends AbstractDatabaseListener {
             }
             }
             if (isFilterTable(databaseName, tableName)) {
             if (isFilterTable(databaseName, tableName)) {
                 logger.info("sql:{}", data.getSql());
                 logger.info("sql:{}", data.getSql());
-                trySendEvent(new DDLChangedEvent(databaseName, tableName, ConnectorConstant.OPERTION_ALTER,
+                trySendEvent(new DDLChangedEvent(tableName, ConnectorConstant.OPERTION_ALTER,
                         data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
                         data.getSql(), client.getBinlogFilename(), client.getBinlogPosition()));
             }
             }
         }
         }

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -144,7 +144,7 @@ public class OracleListener extends AbstractDatabaseListener {
             String tableName = getTableName(alter.getTable());
             String tableName = getTableName(alter.getTable());
             if (tableFiledMap.containsKey(tableName)) {
             if (tableFiledMap.containsKey(tableName)) {
                 logger.info("sql:{}", event.getRedoSql());
                 logger.info("sql:{}", event.getRedoSql());
-                trySendEvent(new DDLChangedEvent(null, tableName, ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null, event.getScn()));
+                trySendEvent(new DDLChangedEvent(tableName, ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null, event.getScn()));
             }
             }
         }
         }
     }
     }

+ 16 - 23
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java

@@ -10,14 +10,11 @@ import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.ProfileComponent;
-import org.dbsyncer.parser.consumer.impl.LogConsumer;
-import org.dbsyncer.parser.consumer.impl.QuartzConsumer;
+import org.dbsyncer.parser.consumer.ParserConsumer;
 import org.dbsyncer.parser.event.RefreshOffsetEvent;
 import org.dbsyncer.parser.event.RefreshOffsetEvent;
 import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
 import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.Meta;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.config.ListenerConfig;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.sdk.listener.AbstractListener;
 import org.dbsyncer.sdk.listener.AbstractListener;
@@ -106,7 +103,7 @@ public final class IncrementPuller extends AbstractPuller implements Application
                 logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
                 logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
             }
             }
         });
         });
-        worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
+        worker.setName("increment-worker-" + mapping.getId());
         worker.setDaemon(false);
         worker.setDaemon(false);
         worker.start();
         worker.start();
     }
     }
@@ -125,20 +122,16 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
 
     @Override
     @Override
     public void onApplicationEvent(RefreshOffsetEvent event) {
     public void onApplicationEvent(RefreshOffsetEvent event) {
-        List<ChangedOffset> offsetList = event.getOffsetList();
-        if (!CollectionUtils.isEmpty(offsetList)) {
-            offsetList.forEach(offset -> {
-                if (offset.isRefreshOffset() && map.containsKey(offset.getMetaId())) {
-                    map.get(offset.getMetaId()).refreshEvent(offset);
-                }
-            });
+        ChangedOffset offset = event.getChangedOffset();
+        if (offset != null && offset.isRefreshOffset() && map.containsKey(offset.getMetaId())) {
+            map.get(offset.getMetaId()).refreshEvent(offset);
         }
         }
     }
     }
 
 
     @Override
     @Override
     public void run() {
     public void run() {
         // 定时同步增量信息
         // 定时同步增量信息
-        map.values().forEach(listener -> listener.flushEvent());
+        map.values().forEach(Listener::flushEvent);
     }
     }
 
 
     private Listener getListener(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) {
     private Listener getListener(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) {
@@ -150,18 +143,18 @@ public final class IncrementPuller extends AbstractPuller implements Application
         if (null == listener) {
         if (null == listener) {
             throw new ManagerException(String.format("Unsupported listener type \"%s\".", connectorConfig.getConnectorType()));
             throw new ManagerException(String.format("Unsupported listener type \"%s\".", connectorConfig.getConnectorType()));
         }
         }
+        listener.register(new ParserConsumer(bufferActuatorRouter, profileComponent, logService, meta.getId(), list));
 
 
         // 默认定时抽取
         // 默认定时抽取
         if (ListenerTypeEnum.isTiming(listenerType) && listener instanceof AbstractQuartzListener) {
         if (ListenerTypeEnum.isTiming(listenerType) && listener instanceof AbstractQuartzListener) {
             AbstractQuartzListener quartzListener = (AbstractQuartzListener) listener;
             AbstractQuartzListener quartzListener = (AbstractQuartzListener) listener;
-            quartzListener.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
-            quartzListener.register(new QuartzConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
-        }
-
-        // 基于日志抽取
-        if (ListenerTypeEnum.isLog(listenerType) && listener instanceof AbstractListener) {
-            AbstractListener abstractListener = (AbstractListener) listener;
-            abstractListener.register(new LogConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
+            List<TableGroupQuartzCommand> quartzCommands = list.stream().map(t -> {
+                final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
+                final Picker picker = new Picker(group.getFieldMapping());
+                Assert.notEmpty(picker.getSourceFields(), "表字段映射关系不能为空:" + group.getSourceTable().getName() + " > " + group.getTargetTable().getName());
+                return new TableGroupQuartzCommand(t.getSourceTable(), picker.getSourceFields(), t.getCommand());
+            }).collect(Collectors.toList());
+            quartzListener.setCommands(quartzCommands);
         }
         }
 
 
         if (listener instanceof AbstractListener) {
         if (listener instanceof AbstractListener) {

+ 13 - 37
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/AbstractConsumer.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/ParserConsumer.java

@@ -7,12 +7,11 @@ import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
 import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
-import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.Watcher;
 import org.dbsyncer.sdk.listener.Watcher;
-import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -22,44 +21,29 @@ import java.util.Map;
  * @Author AE86
  * @Author AE86
  * @Date 2023-11-12 01:32
  * @Date 2023-11-12 01:32
  */
  */
-public abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
-    private BufferActuatorRouter bufferActuatorRouter;
-    private ProfileComponent profileComponent;
-    private LogService logService;
-    private String metaId;
-    protected Mapping mapping;
-    protected List<TableGroup> tableGroups;
+public final class ParserConsumer implements Watcher {
+    private final BufferActuatorRouter bufferActuatorRouter;
+    private final ProfileComponent profileComponent;
+    private final LogService logService;
+    private final String metaId;
 
 
-    public AbstractConsumer init(BufferActuatorRouter bufferActuatorRouter, ProfileComponent profileComponent, LogService logService, String metaId, Mapping mapping, List<TableGroup> tableGroups) {
+    public ParserConsumer(BufferActuatorRouter bufferActuatorRouter, ProfileComponent profileComponent, LogService logService, String metaId, List<TableGroup> tableGroups) {
         this.bufferActuatorRouter = bufferActuatorRouter;
         this.bufferActuatorRouter = bufferActuatorRouter;
         this.profileComponent = profileComponent;
         this.profileComponent = profileComponent;
         this.logService = logService;
         this.logService = logService;
         this.metaId = metaId;
         this.metaId = metaId;
-        this.mapping = mapping;
-        this.tableGroups = tableGroups;
-        postProcessBeforeInitialization();
-        return this;
-    }
-
-    public abstract void postProcessBeforeInitialization();
-
-    public abstract void onChange(E e);
-
-    public void onDDLChanged(DDLChangedEvent event) {
+        // 注册到路由服务中
+        tableGroups.forEach(t -> bufferActuatorRouter.bind(metaId, t.getSourceTable().getName()));
     }
     }
 
 
     @Override
     @Override
     public void changeEvent(ChangedEvent event) {
     public void changeEvent(ChangedEvent event) {
         event.getChangedOffset().setMetaId(metaId);
         event.getChangedOffset().setMetaId(metaId);
-        switch (event.getType()){
-            case ROW:
-            case SCAN:
-                onChange((E) event);
-                break;
-            case DDL:
-                onDDLChanged((DDLChangedEvent) event);
-                break;
+        if (ChangedEventTypeEnum.isScan(event.getType())) {
+            // TODO 如果是DDL,阻塞等待队列消费完成
+            event.getChangedOffset().setRefreshOffset(false);
         }
         }
+        bufferActuatorRouter.execute(metaId, event);
     }
     }
 
 
     @Override
     @Override
@@ -81,12 +65,4 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
         Meta meta = profileComponent.getMeta(metaId);
         Meta meta = profileComponent.getMeta(metaId);
         return meta != null ? meta.getUpdateTime() : 0L;
         return meta != null ? meta.getUpdateTime() : 0L;
     }
     }
-
-    protected void bind(String tableGroupId) {
-        bufferActuatorRouter.bind(metaId, tableGroupId);
-    }
-
-    protected void execute(String tableGroupId, ChangedEvent event) {
-        bufferActuatorRouter.execute(metaId, tableGroupId, event);
-    }
 }
 }

+ 0 - 87
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/LogConsumer.java

@@ -1,87 +0,0 @@
-/**
- * DBSyncer Copyright 2020-2023 All Rights Reserved.
- */
-package org.dbsyncer.parser.consumer.impl;
-
-import org.dbsyncer.sdk.listener.event.CommonChangedEvent;
-import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
-import org.dbsyncer.sdk.listener.event.RowChangedEvent;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.parser.consumer.AbstractConsumer;
-import org.dbsyncer.parser.model.FieldPicker;
-import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.util.PickerUtil;
-import org.dbsyncer.sdk.model.Table;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Consumer;
-
-/**
- * 日志消费
- *
- * @Version 1.0.0
- * @Author AE86
- * @Date 2023-11-12 02:25
- */
-public final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
-    private final Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
-
-    //判断上次是否为ddl,是ddl需要强制刷新下picker
-    private boolean ddlChanged;
-
-    @Override
-    public void postProcessBeforeInitialization() {
-        addTablePicker(true);
-    }
-
-    @Override
-    public void onChange(RowChangedEvent event) {
-        // 需要强制刷新 fix https://gitee.com/ghi/dbsyncer/issues/I8DJUR
-        if (ddlChanged) {
-            addTablePicker(false);
-            ddlChanged = false;
-        }
-        process(event, picker -> {
-            final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
-            if (picker.filter(changedRow)) {
-                event.setChangedRow(changedRow);
-                execute(picker.getTableGroup().getId(), event);
-            }
-        });
-    }
-
-    @Override
-    public void onDDLChanged(DDLChangedEvent event) {
-        ddlChanged = true;
-        process(event, picker -> execute(picker.getTableGroup().getId(), event));
-    }
-
-    private void process(CommonChangedEvent event, Consumer<FieldPicker> consumer) {
-        // 处理过程有异常向上抛
-        List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());
-        if (!CollectionUtils.isEmpty(pickers)) {
-            // 触发刷新增量点事件
-            event.getChangedOffset().setRefreshOffset(true);
-            pickers.forEach(picker -> consumer.accept(picker));
-        }
-    }
-
-    private void addTablePicker(boolean bindBufferActuatorRouter) {
-        this.tablePicker.clear();
-        this.tableGroups.forEach(t -> {
-            final Table table = t.getSourceTable();
-            final String tableName = table.getName();
-            tablePicker.putIfAbsent(tableName, new ArrayList<>());
-            TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
-            tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
-            // 是否注册到路由服务中
-            if (bindBufferActuatorRouter) {
-                bind(group.getId());
-            }
-        });
-    }
-
-}

+ 0 - 42
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/QuartzConsumer.java

@@ -1,42 +0,0 @@
-/**
- * DBSyncer Copyright 2020-2023 All Rights Reserved.
- */
-package org.dbsyncer.parser.consumer.impl;
-
-import org.dbsyncer.sdk.listener.event.ScanChangedEvent;
-import org.dbsyncer.parser.consumer.AbstractConsumer;
-import org.dbsyncer.parser.model.FieldPicker;
-import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.util.PickerUtil;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * 定时消费
- *
- * @Version 1.0.0
- * @Author AE86
- * @Date 2023-11-12 02:18
- */
-public final class QuartzConsumer extends AbstractConsumer<ScanChangedEvent> {
-    private final List<FieldPicker> tablePicker = new LinkedList<>();
-
-    @Override
-    public void postProcessBeforeInitialization() {
-        tableGroups.forEach(t -> {
-            tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t)));
-            bind(t.getId());
-        });
-    }
-
-    @Override
-    public void onChange(ScanChangedEvent event) {
-        final FieldPicker picker = tablePicker.get(event.getTableGroupIndex());
-        TableGroup tableGroup = picker.getTableGroup();
-        event.setSourceTableName(tableGroup.getSourceTable().getName());
-
-        // 定时暂不支持触发刷新增量点事件
-        execute(tableGroup.getId(), event);
-    }
-}

+ 5 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/event/RefreshOffsetEvent.java

@@ -15,7 +15,7 @@ import java.util.List;
  */
  */
 public final class RefreshOffsetEvent extends ApplicationContextEvent {
 public final class RefreshOffsetEvent extends ApplicationContextEvent {
 
 
-    private List<ChangedOffset> offsetList;
+    private final ChangedOffset changedOffset;
 
 
     /**
     /**
      * Create a new ContextStartedEvent.
      * Create a new ContextStartedEvent.
@@ -23,12 +23,12 @@ public final class RefreshOffsetEvent extends ApplicationContextEvent {
      * @param source the {@code ApplicationContext} that the event is raised for
      * @param source the {@code ApplicationContext} that the event is raised for
      *               (must not be {@code null})
      *               (must not be {@code null})
      */
      */
-    public RefreshOffsetEvent(ApplicationContext source, List<ChangedOffset> offsetList) {
+    public RefreshOffsetEvent(ApplicationContext source, ChangedOffset changedOffset) {
         super(source);
         super(source);
-        this.offsetList = offsetList;
+        this.changedOffset = changedOffset;
     }
     }
 
 
-    public List<ChangedOffset> getOffsetList() {
-        return offsetList;
+    public ChangedOffset getChangedOffset() {
+        return changedOffset;
     }
     }
 }
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -80,7 +80,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
      */
     protected void buildQueueConfig() {
     protected void buildQueueConfig() {
         taskLock = new ReentrantLock();
         taskLock = new ReentrantLock();
-        this.queue = new LinkedBlockingQueue(config.getBufferQueueCapacity());
+        this.queue = new LinkedBlockingQueue<>(config.getBufferQueueCapacity());
         logger.info("{} initialized with queue capacity: {}", this.getClass().getSimpleName(), config.getBufferQueueCapacity());
         logger.info("{} initialized with queue capacity: {}", this.getClass().getSimpleName(), config.getBufferQueueCapacity());
     }
     }
 
 

+ 7 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java

@@ -44,15 +44,15 @@ public final class BufferActuatorRouter implements DisposableBean {
      */
      */
     private final Map<String, Map<String, TableGroupBufferActuator>> router = new ConcurrentHashMap<>();
     private final Map<String, Map<String, TableGroupBufferActuator>> router = new ConcurrentHashMap<>();
 
 
-    public void execute(String metaId, String tableGroupId, ChangedEvent event) {
-        if (router.containsKey(metaId) && router.get(metaId).containsKey(tableGroupId)) {
-            router.get(metaId).get(tableGroupId).offer(new WriterRequest(tableGroupId, event));
+    public void execute(String metaId, ChangedEvent event) {
+        if (router.containsKey(metaId) && router.get(metaId).containsKey(event.getSourceTableName())) {
+            router.get(metaId).get(event.getSourceTableName()).offer(new WriterRequest(event));
             return;
             return;
         }
         }
-        generalBufferActuator.offer(new WriterRequest(tableGroupId, event));
+        generalBufferActuator.offer(new WriterRequest(event));
     }
     }
 
 
-    public void bind(String metaId, String tableGroupId) {
+    public void bind(String metaId, String tableName) {
         router.computeIfAbsent(metaId, k -> new ConcurrentHashMap<>());
         router.computeIfAbsent(metaId, k -> new ConcurrentHashMap<>());
 
 
         // TODO 暂定执行器上限,待替换为LRU模型
         // TODO 暂定执行器上限,待替换为LRU模型
@@ -60,11 +60,11 @@ public final class BufferActuatorRouter implements DisposableBean {
             return;
             return;
         }
         }
 
 
-        router.get(metaId).computeIfAbsent(tableGroupId, k -> {
+        router.get(metaId).computeIfAbsent(tableName, k -> {
             TableGroupBufferActuator newBufferActuator = null;
             TableGroupBufferActuator newBufferActuator = null;
             try {
             try {
                 newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
                 newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
-                newBufferActuator.setTableGroupId(tableGroupId);
+                newBufferActuator.setTableName(tableName);
                 newBufferActuator.buildConfig();
                 newBufferActuator.buildConfig();
             } catch (CloneNotSupportedException ex) {
             } catch (CloneNotSupportedException ex) {
                 logger.error(ex.getMessage(), ex);
                 logger.error(ex.getMessage(), ex);

+ 19 - 11
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -87,7 +87,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
 
     @Override
     @Override
     protected String getPartitionKey(WriterRequest request) {
     protected String getPartitionKey(WriterRequest request) {
-        return request.getTableGroupId();
+        return request.getTableName();
     }
     }
 
 
     @Override
     @Override
@@ -96,10 +96,10 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
             response.addData(request.getRow());
             response.addData(request.getRow());
         }
         }
         if (request.getChangedOffset() != null) {
         if (request.getChangedOffset() != null) {
-            response.addChangedOffset(request.getChangedOffset());
+            response.setChangedOffset(request.getChangedOffset());
         }
         }
         if (!response.isMerged()) {
         if (!response.isMerged()) {
-            response.setTableGroupId(request.getTableGroupId());
+            response.setTableName(request.getTableName());
             response.setEvent(request.getEvent());
             response.setEvent(request.getEvent());
             response.setTypeEnum(request.getTypeEnum());
             response.setTypeEnum(request.getTypeEnum());
             response.setSql(request.getSql());
             response.setSql(request.getSql());
@@ -116,8 +116,19 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
 
 
     @Override
     @Override
     public void pull(WriterResponse response) {
     public void pull(WriterResponse response) {
+        // TODO add cache
+        List<TableGroup> groupAll = profileComponent.getTableGroupAll(response.getChangedOffset().getMetaId());
+        if (!CollectionUtils.isEmpty(groupAll)) {
+            groupAll.forEach(tableGroup -> {
+                if (StringUtil.equals(tableGroup.getSourceTable().getName(), response.getTableName())) {
+                    distributeTableGroup(response, tableGroup);
+                }
+            });
+        }
+    }
+
+    private void distributeTableGroup(WriterResponse response, TableGroup tableGroup) {
         // 0、获取配置信息
         // 0、获取配置信息
-        final TableGroup tableGroup = getTableGroup(response.getTableGroupId());
         final Mapping mapping = profileComponent.getMapping(tableGroup.getMappingId());
         final Mapping mapping = profileComponent.getMapping(tableGroup.getMappingId());
         final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
         final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
 
 
@@ -128,7 +139,8 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         }
         }
 
 
         final Picker picker = new Picker(group.getFieldMapping());
         final Picker picker = new Picker(group.getFieldMapping());
-        final List<Map> sourceDataList = response.getDataList();
+
+        final List<Map> sourceDataList = null; //response.getDataList();
         // 2、映射字段
         // 2、映射字段
         List<Map> targetDataList = picker.pickTargetData(sourceDataList);
         List<Map> targetDataList = picker.pickTargetData(sourceDataList);
 
 
@@ -155,7 +167,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         Result result = parserComponent.writeBatch(context, getExecutor());
         Result result = parserComponent.writeBatch(context, getExecutor());
 
 
         // 6.发布刷新增量点事件
         // 6.发布刷新增量点事件
-        applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
+        applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
 
 
         // 7、持久化同步结果
         // 7、持久化同步结果
         result.setTableGroupId(tableGroup.getId());
         result.setTableGroupId(tableGroup.getId());
@@ -182,10 +194,6 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return generalExecutor;
         return generalExecutor;
     }
     }
 
 
-    public TableGroup getTableGroup(String tableGroupId) {
-        return profileComponent.getTableGroup(tableGroupId);
-    }
-
     /**
     /**
      * 解析DDL
      * 解析DDL
      *
      *
@@ -227,7 +235,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
                 profileComponent.editTableGroup(tableGroup);
                 profileComponent.editTableGroup(tableGroup);
 
 
                 // 6.发布更新事件,持久化增量数据
                 // 6.发布更新事件,持久化增量数据
-                applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
+                applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
                 flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
                 flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
                 return;
                 return;
             }
             }

+ 6 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -34,7 +34,7 @@ public class TableGroupBufferActuator extends GeneralBufferActuator implements C
 
 
     private String taskKey;
     private String taskKey;
 
 
-    private String tableGroupId;
+    private String tableName;
 
 
     private volatile boolean running;
     private volatile boolean running;
 
 
@@ -61,7 +61,7 @@ public class TableGroupBufferActuator extends GeneralBufferActuator implements C
         int coreSize = tableGroupBufferConfig.getThreadCoreSize();
         int coreSize = tableGroupBufferConfig.getThreadCoreSize();
         int maxSize = tableGroupBufferConfig.getMaxThreadSize();
         int maxSize = tableGroupBufferConfig.getMaxThreadSize();
         int queueCapacity = tableGroupBufferConfig.getThreadQueueCapacity();
         int queueCapacity = tableGroupBufferConfig.getThreadQueueCapacity();
-        String threadNamePrefix = "TableGroupExecutor-" + tableGroupId + StringUtil.SYMBOL;
+        String threadNamePrefix = "TableGroupExecutor-" + tableName + StringUtil.SYMBOL + tableName.hashCode() + StringUtil.SYMBOL;
         threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, maxSize, queueCapacity, 30, threadNamePrefix);
         threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, maxSize, queueCapacity, 30, threadNamePrefix);
         running = true;
         running = true;
         scheduledTaskService.start(taskKey, tableGroupBufferConfig.getBufferPeriodMillisecond(), this);
         scheduledTaskService.start(taskKey, tableGroupBufferConfig.getBufferPeriodMillisecond(), this);
@@ -80,11 +80,11 @@ public class TableGroupBufferActuator extends GeneralBufferActuator implements C
         scheduledTaskService.stop(taskKey);
         scheduledTaskService.stop(taskKey);
     }
     }
 
 
-    public String getTableGroupId() {
-        return tableGroupId;
+    public String getTableName() {
+        return tableName;
     }
     }
 
 
-    public void setTableGroupId(String tableGroupId) {
-        this.tableGroupId = tableGroupId;
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
     }
     }
 }
 }

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ProfileComponentImpl.java

@@ -154,11 +154,10 @@ public class ProfileComponentImpl implements ProfileComponent {
 
 
     @Override
     @Override
     public List<TableGroup> getSortedTableGroupAll(String mappingId) {
     public List<TableGroup> getSortedTableGroupAll(String mappingId) {
-        List<TableGroup> list = getTableGroupAll(mappingId)
+        return getTableGroupAll(mappingId)
                 .stream()
                 .stream()
                 .sorted(Comparator.comparing(TableGroup::getIndex).reversed())
                 .sorted(Comparator.comparing(TableGroup::getIndex).reversed())
                 .collect(Collectors.toList());
                 .collect(Collectors.toList());
-        return list;
     }
     }
 
 
     @Override
     @Override

+ 16 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/AbstractWriter.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.model;
 package org.dbsyncer.parser.model;
 
 
 import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
+import org.dbsyncer.sdk.model.ChangedOffset;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -11,7 +12,9 @@ public abstract class AbstractWriter {
 
 
     private ChangedEventTypeEnum typeEnum;
     private ChangedEventTypeEnum typeEnum;
 
 
-    private String tableGroupId;
+    private ChangedOffset changedOffset;
+
+    private String tableName;
 
 
     private String event;
     private String event;
 
 
@@ -25,12 +28,20 @@ public abstract class AbstractWriter {
         this.typeEnum = typeEnum;
         this.typeEnum = typeEnum;
     }
     }
 
 
-    public String getTableGroupId() {
-        return tableGroupId;
+    public ChangedOffset getChangedOffset() {
+        return changedOffset;
+    }
+
+    public void setChangedOffset(ChangedOffset changedOffset) {
+        this.changedOffset = changedOffset;
+    }
+
+    public String getTableName() {
+        return tableName;
     }
     }
 
 
-    public void setTableGroupId(String tableGroupId) {
-        this.tableGroupId = tableGroupId;
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
     }
     }
 
 
     public String getEvent() {
     public String getEvent() {

+ 13 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -4,13 +4,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.Field;
 
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 public class Picker {
 public class Picker {
@@ -80,10 +74,10 @@ public class Picker {
         }
         }
     }
     }
 
 
-    public List<Field> getTargetFields() {
+    private List<Field> getFields(List<Field> list) {
         List<Field> fields = new ArrayList<>();
         List<Field> fields = new ArrayList<>();
         Set<String> keys = new HashSet<>();
         Set<String> keys = new HashSet<>();
-        targetFields.forEach(f -> {
+        list.forEach(f -> {
             if (!keys.contains(f.getName())) {
             if (!keys.contains(f.getName())) {
                 fields.add(f);
                 fields.add(f);
                 keys.add(f.getName());
                 keys.add(f.getName());
@@ -92,7 +86,16 @@ public class Picker {
         return Collections.unmodifiableList(fields);
         return Collections.unmodifiableList(fields);
     }
     }
 
 
+    public List<Field> getSourceFields() {
+        return getFields(sourceFields);
+    }
+
+    public List<Field> getTargetFields() {
+        return getFields(targetFields);
+    }
+
     public Map<String, Field> getTargetFieldMap() {
     public Map<String, Field> getTargetFieldMap() {
-        return targetFields.stream().filter(f -> null != f).collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+        return targetFields.stream().filter(Objects::nonNull).collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
     }
+
 }
 }

+ 7 - 14
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -2,9 +2,8 @@ package org.dbsyncer.parser.model;
 
 
 import org.dbsyncer.parser.flush.BufferRequest;
 import org.dbsyncer.parser.flush.BufferRequest;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.ChangedEvent;
-import org.dbsyncer.sdk.model.ChangedOffset;
 
 
-import java.util.Map;
+import java.util.List;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -13,30 +12,24 @@ import java.util.Map;
  */
  */
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 
 
-    private Map row;
+    private final List<Object> row;
 
 
-    private ChangedOffset changedOffset;
-
-    public WriterRequest(String tableGroupId, ChangedEvent event) {
+    public WriterRequest(ChangedEvent event) {
         setTypeEnum(event.getType());
         setTypeEnum(event.getType());
-        setTableGroupId(tableGroupId);
+        setChangedOffset(event.getChangedOffset());
+        setTableName(event.getSourceTableName());
         setEvent(event.getEvent());
         setEvent(event.getEvent());
         setSql(event.getSql());
         setSql(event.getSql());
         this.row = event.getChangedRow();
         this.row = event.getChangedRow();
-        this.changedOffset = event.getChangedOffset();
     }
     }
 
 
     @Override
     @Override
     public String getMetaId() {
     public String getMetaId() {
-        return changedOffset.getMetaId();
+        return getChangedOffset().getMetaId();
     }
     }
 
 
-    public Map getRow() {
+    public List<Object> getRow() {
         return row;
         return row;
     }
     }
 
 
-    public ChangedOffset getChangedOffset() {
-        return changedOffset;
-    }
-
 }
 }

+ 3 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -2,11 +2,9 @@ package org.dbsyncer.parser.model;
 
 
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferResponse;
 import org.dbsyncer.parser.flush.BufferResponse;
-import org.dbsyncer.sdk.model.ChangedOffset;
 
 
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -15,9 +13,7 @@ import java.util.Map;
  */
  */
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 
 
-    private List<Map> dataList = new LinkedList<>();
-
-    private List<ChangedOffset> offsetList = new LinkedList<>();
+    private final List<List<Object>> dataList = new LinkedList<>();
 
 
     private transient boolean isMerged;
     private transient boolean isMerged;
 
 
@@ -31,22 +27,14 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return StringUtil.SYMBOL.concat(getEvent());
         return StringUtil.SYMBOL.concat(getEvent());
     }
     }
 
 
-    public void addData(Map data) {
+    public void addData(List<Object> data) {
         dataList.add(data);
         dataList.add(data);
     }
     }
 
 
-    public void addChangedOffset(ChangedOffset changedOffset) {
-        offsetList.add(changedOffset);
-    }
-
-    public List<Map> getDataList() {
+    public List<List<Object>> getDataList() {
         return dataList;
         return dataList;
     }
     }
 
 
-    public List<ChangedOffset> getOffsetList() {
-        return offsetList;
-    }
-
     public boolean isMerged() {
     public boolean isMerged() {
         return isMerged;
         return isMerged;
     }
     }

+ 3 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/ChangedEventTypeEnum.java

@@ -25,4 +25,7 @@ public enum ChangedEventTypeEnum {
         return DDL == event;
         return DDL == event;
     }
     }
 
 
+    public static boolean isScan(ChangedEventTypeEnum type) {
+        return SCAN == type;
+    }
 }
 }

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractDatabaseListener.java

@@ -65,7 +65,7 @@ public abstract class AbstractDatabaseListener extends AbstractListener<Database
                     case ConnectorConstant.OPERTION_UPDATE:
                     case ConnectorConstant.OPERTION_UPDATE:
                     case ConnectorConstant.OPERTION_INSERT:
                     case ConnectorConstant.OPERTION_INSERT:
                         try {
                         try {
-                            queryDqlData(dqlMapper, changedEvent.getDataList());
+                            queryDqlData(dqlMapper, changedEvent.getChangedRow());
                         } catch (Exception e) {
                         } catch (Exception e) {
                             return;
                             return;
                         }
                         }

+ 10 - 10
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractQuartzListener.java

@@ -109,10 +109,10 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
         running = false;
         running = false;
     }
     }
 
 
-    private void execute(TableGroupQuartzCommand tableGroupQuartzCommand, int index) {
-        final Map<String, String> command = tableGroupQuartzCommand.getCommand();
-        final List<String> primaryKeys = tableGroupQuartzCommand.getPrimaryKeys();
-        final Table table = tableGroupQuartzCommand.getTable();
+    private void execute(TableGroupQuartzCommand cmd, int index) {
+        final Map<String, String> command = cmd.getCommand();
+        final List<String> primaryKeys = cmd.getPrimaryKeys();
+        final Table table = cmd.getTable();
 
 
         // 检查增量点
         // 检查增量点
         Point point = checkLastPoint(command, index);
         Point point = checkLastPoint(command, index);
@@ -137,21 +137,21 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
 
 
             for (Map<String, Object> row : data) {
             for (Map<String, Object> row : data) {
                 if (customEvent) {
                 if (customEvent) {
-                    trySendEvent(new ScanChangedEvent(index, event, row));
+                    trySendEvent(new ScanChangedEvent(table.getName(), event, cmd.getChangedRow(row)));
                     continue;
                     continue;
                 }
                 }
 
 
                 Object eventValue = StringUtil.toString(row.get(eventFieldName));
                 Object eventValue = StringUtil.toString(row.get(eventFieldName));
                 if (update.contains(eventValue)) {
                 if (update.contains(eventValue)) {
-                    trySendEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
+                    trySendEvent(new ScanChangedEvent(table.getName(), ConnectorConstant.OPERTION_UPDATE, cmd.getChangedRow(row)));
                     continue;
                     continue;
                 }
                 }
                 if (insert.contains(eventValue)) {
                 if (insert.contains(eventValue)) {
-                    trySendEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
+                    trySendEvent(new ScanChangedEvent(table.getName(), ConnectorConstant.OPERTION_INSERT, cmd.getChangedRow(row)));
                     continue;
                     continue;
                 }
                 }
                 if (delete.contains(eventValue)) {
                 if (delete.contains(eventValue)) {
-                    trySendEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
+                    trySendEvent(new ScanChangedEvent(table.getName(), ConnectorConstant.OPERTION_DELETE, cmd.getChangedRow(row)));
                 }
                 }
             }
             }
             // 更新记录点
             // 更新记录点
@@ -174,9 +174,9 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
 
 
     }
     }
 
 
-    private void trySendEvent(ChangedEvent event){
+    private void trySendEvent(ChangedEvent event) {
         // 如果消费事件失败,重试
         // 如果消费事件失败,重试
-        while (running){
+        while (running) {
             try {
             try {
                 changeEvent(event);
                 changeEvent(event);
                 break;
                 break;

+ 17 - 9
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/ChangedEvent.java

@@ -4,9 +4,12 @@
 package org.dbsyncer.sdk.listener;
 package org.dbsyncer.sdk.listener;
 
 
 import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
+import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
+import org.dbsyncer.sdk.listener.event.RowChangedEvent;
+import org.dbsyncer.sdk.listener.event.ScanChangedEvent;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.dbsyncer.sdk.model.ChangedOffset;
 
 
-import java.util.Map;
+import java.util.List;
 
 
 /**
 /**
  * 变更事件
  * 变更事件
@@ -39,24 +42,29 @@ public interface ChangedEvent {
     String getEvent();
     String getEvent();
 
 
     /**
     /**
-     * 获取变更SQL
+     * 获取增量偏移量
      *
      *
      * @return
      * @return
      */
      */
-    String getSql();
+    ChangedOffset getChangedOffset();
 
 
     /**
     /**
-     * 获取变更行数据
+     * 获取变更SQL
      *
      *
-     * @return
+     * {@link DDLChangedEvent}
      */
      */
-    Map<String, Object> getChangedRow();
+    default String getSql() {
+        return null;
+    }
 
 
     /**
     /**
-     * 获取增量偏移量
+     * 获取变更行数据
      *
      *
-     * @return
+     * {@link RowChangedEvent}
+     * {@link ScanChangedEvent}
      */
      */
-    ChangedOffset getChangedOffset();
+    default List<Object> getChangedRow() {
+        return null;
+    }
 
 
 }
 }

+ 0 - 28
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/CommonChangedEvent.java

@@ -6,8 +6,6 @@ package org.dbsyncer.sdk.listener.event;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.dbsyncer.sdk.model.ChangedOffset;
 
 
-import java.util.Map;
-
 /**
 /**
  * 通用变更事件
  * 通用变更事件
  *
  *
@@ -25,14 +23,6 @@ public abstract class CommonChangedEvent implements ChangedEvent {
      * 变更事件
      * 变更事件
      */
      */
     private String event;
     private String event;
-    /**
-     * 变更sql
-     */
-    private String sql;
-    /**
-     * 变更行数据
-     */
-    private Map<String, Object> changedRow;
     /**
     /**
      * 增量偏移量
      * 增量偏移量
      */
      */
@@ -56,24 +46,6 @@ public abstract class CommonChangedEvent implements ChangedEvent {
         this.event = event;
         this.event = event;
     }
     }
 
 
-    @Override
-    public String getSql() {
-        return sql;
-    }
-
-    public void setSql(String sql) {
-        this.sql = sql;
-    }
-
-    @Override
-    public Map<String, Object> getChangedRow() {
-        return changedRow;
-    }
-
-    public void setChangedRow(Map<String, Object> changedRow) {
-        this.changedRow = changedRow;
-    }
-
     @Override
     @Override
     public ChangedOffset getChangedOffset() {
     public ChangedOffset getChangedOffset() {
         return changedOffset;
         return changedOffset;

+ 8 - 12
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/DDLChangedEvent.java

@@ -10,27 +10,23 @@ import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
  * @Date 2023-09-18 23:00
  * @Date 2023-09-18 23:00
  */
  */
 public final class DDLChangedEvent extends CommonChangedEvent {
 public final class DDLChangedEvent extends CommonChangedEvent {
+    private final String sql;
 
 
-    /**
-     * 变更数据库
-      */
-    private String database;
-
-    public DDLChangedEvent(String database, String sourceTableName, String event, String sql, String nextFileName, Object position) {
+    public DDLChangedEvent(String sourceTableName, String event, String sql, String nextFileName, Object position) {
         setSourceTableName(sourceTableName);
         setSourceTableName(sourceTableName);
         setEvent(event);
         setEvent(event);
         setNextFileName(nextFileName);
         setNextFileName(nextFileName);
         setPosition(position);
         setPosition(position);
-        setSql(sql);
-        this.database = database;
-    }
-
-    public String getDatabase() {
-        return database;
+        this.sql = sql;
     }
     }
 
 
     @Override
     @Override
     public ChangedEventTypeEnum getType() {
     public ChangedEventTypeEnum getType() {
         return ChangedEventTypeEnum.DDL;
         return ChangedEventTypeEnum.DDL;
     }
     }
+
+    @Override
+    public String getSql() {
+        return sql;
+    }
 }
 }

+ 8 - 7
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/RowChangedEvent.java

@@ -14,8 +14,8 @@ import java.util.List;
  * @Author AE86
  * @Author AE86
  * @Date 2020-06-15 20:00
  * @Date 2020-06-15 20:00
  */
  */
-public class RowChangedEvent extends CommonChangedEvent {
-    private List<Object> dataList;
+public final class RowChangedEvent extends CommonChangedEvent {
+    private final List<Object> changedRow;
 
 
     public RowChangedEvent(String sourceTableName, String event, List<Object> data) {
     public RowChangedEvent(String sourceTableName, String event, List<Object> data) {
         this(sourceTableName, event, data, null, null);
         this(sourceTableName, event, data, null, null);
@@ -26,15 +26,16 @@ public class RowChangedEvent extends CommonChangedEvent {
         setEvent(event);
         setEvent(event);
         setNextFileName(nextFileName);
         setNextFileName(nextFileName);
         setPosition(position);
         setPosition(position);
-        this.dataList = data;
-    }
-
-    public List<Object> getDataList() {
-        return dataList;
+        this.changedRow = data;
     }
     }
 
 
     @Override
     @Override
     public ChangedEventTypeEnum getType() {
     public ChangedEventTypeEnum getType() {
         return ChangedEventTypeEnum.ROW;
         return ChangedEventTypeEnum.ROW;
     }
     }
+
+    @Override
+    public List<Object> getChangedRow() {
+        return changedRow;
+    }
 }
 }

+ 10 - 10
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/ScanChangedEvent.java

@@ -5,7 +5,7 @@ package org.dbsyncer.sdk.listener.event;
 
 
 import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 
 
-import java.util.Map;
+import java.util.List;
 
 
 /**
 /**
  * 定时扫表变更事件
  * 定时扫表变更事件
@@ -15,21 +15,21 @@ import java.util.Map;
  * @Date 2023-08-20 20:00
  * @Date 2023-08-20 20:00
  */
  */
 public final class ScanChangedEvent extends CommonChangedEvent {
 public final class ScanChangedEvent extends CommonChangedEvent {
+    private final List<Object> changedRow;
 
 
-    private int tableGroupIndex;
-
-    public ScanChangedEvent(int index, String event, Map<String, Object> changedRow) {
-        this.tableGroupIndex = index;
+    public ScanChangedEvent(String sourceTableName, String event, List<Object> data) {
+        setSourceTableName(sourceTableName);
         setEvent(event);
         setEvent(event);
-        setChangedRow(changedRow);
-    }
-
-    public int getTableGroupIndex() {
-        return tableGroupIndex;
+        this.changedRow = data;
     }
     }
 
 
     @Override
     @Override
     public ChangedEventTypeEnum getType() {
     public ChangedEventTypeEnum getType() {
         return ChangedEventTypeEnum.SCAN;
         return ChangedEventTypeEnum.SCAN;
     }
     }
+
+    @Override
+    public List<Object> getChangedRow() {
+        return changedRow;
+    }
 }
 }

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/ChangedOffset.java

@@ -23,7 +23,7 @@ public final class ChangedOffset {
     /**
     /**
      * 是否触发刷新增量点事件
      * 是否触发刷新增量点事件
      */
      */
-    private boolean refreshOffset;
+    private boolean refreshOffset = true;
 
 
     public String getMetaId() {
     public String getMetaId() {
         return metaId;
         return metaId;

+ 18 - 4
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/TableGroupQuartzCommand.java

@@ -2,19 +2,23 @@ package org.dbsyncer.sdk.model;
 
 
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 public class TableGroupQuartzCommand {
 public class TableGroupQuartzCommand {
 
 
-    private Table table;
+    private final Table table;
 
 
-    private List<String> primaryKeys;
+    private final List<Field> fields;
 
 
-    private Map<String, String> command;
+    private final Map<String, String> command;
 
 
-    public TableGroupQuartzCommand(Table table, Map<String, String> command) {
+    private final List<String> primaryKeys;
+
+    public TableGroupQuartzCommand(Table table, List<Field> fields, Map<String, String> command) {
         this.table = table;
         this.table = table;
+        this.fields = fields;
         this.command = command;
         this.command = command;
         this.primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);;
         this.primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);;
     }
     }
@@ -23,6 +27,10 @@ public class TableGroupQuartzCommand {
         return table;
         return table;
     }
     }
 
 
+    public List<Field> getFields() {
+        return fields;
+    }
+
     public List<String> getPrimaryKeys() {
     public List<String> getPrimaryKeys() {
         return primaryKeys;
         return primaryKeys;
     }
     }
@@ -30,4 +38,10 @@ public class TableGroupQuartzCommand {
     public Map<String, String> getCommand() {
     public Map<String, String> getCommand() {
         return command;
         return command;
     }
     }
+
+    public List<Object> getChangedRow(Map<String, Object> row) {
+        List<Object> changedRow = new ArrayList<>(fields.size());
+        fields.forEach(field -> changedRow.add(row.get(field.getName())));
+        return changedRow;
+    }
 }
 }