|
@@ -19,14 +19,15 @@ import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
|
|
import org.dbsyncer.parser.model.Meta;
|
|
import org.dbsyncer.parser.model.Meta;
|
|
import org.dbsyncer.parser.model.Picker;
|
|
import org.dbsyncer.parser.model.Picker;
|
|
import org.dbsyncer.parser.model.TableGroup;
|
|
import org.dbsyncer.parser.model.TableGroup;
|
|
|
|
+import org.dbsyncer.sdk.constant.ConfigConstant;
|
|
|
|
+import org.dbsyncer.sdk.constant.ConnectorConstant;
|
|
import org.dbsyncer.sdk.enums.StorageEnum;
|
|
import org.dbsyncer.sdk.enums.StorageEnum;
|
|
-import org.dbsyncer.sdk.listener.event.RowChangedEvent;
|
|
|
|
-import org.dbsyncer.sdk.model.Field;
|
|
|
|
import org.dbsyncer.sdk.filter.FieldResolver;
|
|
import org.dbsyncer.sdk.filter.FieldResolver;
|
|
import org.dbsyncer.sdk.filter.Query;
|
|
import org.dbsyncer.sdk.filter.Query;
|
|
|
|
+import org.dbsyncer.sdk.listener.event.RowChangedEvent;
|
|
|
|
+import org.dbsyncer.sdk.model.Field;
|
|
import org.dbsyncer.sdk.storage.StorageService;
|
|
import org.dbsyncer.sdk.storage.StorageService;
|
|
import org.dbsyncer.storage.binlog.proto.BinlogMap;
|
|
import org.dbsyncer.storage.binlog.proto.BinlogMap;
|
|
-import org.dbsyncer.sdk.constant.ConfigConstant;
|
|
|
|
import org.dbsyncer.storage.util.BinlogMessageUtil;
|
|
import org.dbsyncer.storage.util.BinlogMessageUtil;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -111,11 +112,18 @@ public class DataSyncServiceImpl implements DataSyncService {
|
|
return Collections.EMPTY_MAP;
|
|
return Collections.EMPTY_MAP;
|
|
}
|
|
}
|
|
|
|
|
|
- // 3、反序列
|
|
|
|
|
|
+ // 3、获取DDL
|
|
Map<String, Object> target = new HashMap<>();
|
|
Map<String, Object> target = new HashMap<>();
|
|
|
|
+ BinlogMap message = BinlogMap.parseFrom(bytes);
|
|
|
|
+ String event = (String) row.get(ConfigConstant.DATA_EVENT);
|
|
|
|
+ if (StringUtil.equals(event, ConnectorConstant.OPERTION_ALTER)) {
|
|
|
|
+ message.getRowMap().forEach((k, v) -> target.put(k, v.toStringUtf8()));
|
|
|
|
+ return target;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 4、反序列
|
|
final Picker picker = new Picker(tableGroup);
|
|
final Picker picker = new Picker(tableGroup);
|
|
final Map<String, Field> fieldMap = picker.getTargetFieldMap();
|
|
final Map<String, Field> fieldMap = picker.getTargetFieldMap();
|
|
- BinlogMap message = BinlogMap.parseFrom(bytes);
|
|
|
|
message.getRowMap().forEach((k, v) -> {
|
|
message.getRowMap().forEach((k, v) -> {
|
|
if (fieldMap.containsKey(k)) {
|
|
if (fieldMap.containsKey(k)) {
|
|
try {
|
|
try {
|