AE86 пре 2 година
родитељ
комит
d22a1eb4e2

+ 4 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ConnectorConfig.java

@@ -26,4 +26,8 @@ public abstract class ConnectorConfig {
         return this;
         return this;
     }
     }
 
 
+    public String getPrimaryKey() {
+        return "";
+    }
+
 }
 }

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -70,7 +70,7 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
             }
             }
         }
         }
         Assert.isTrue(findPkIndex, "The primaryKey is invalid.");
         Assert.isTrue(findPkIndex, "The primaryKey is invalid.");
-        String sql = new StringBuilder(cfg.getSql()).append(" AND ").append(cfg.getPrimaryKey()).append("=?").toString();
+        String sql = new StringBuilder(cfg.getSql()).append(" AND ").append(primaryKey).append("=?").toString();
 
 
         dqlMapper = new DqlMapper(mapper, tableName, column, pkIndex, sql);
         dqlMapper = new DqlMapper(mapper, tableName, column, pkIndex, sql);
     }
     }

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -345,6 +345,8 @@ public class DBChangeNotification {
                     }
                     }
                 } catch (InterruptedException e) {
                 } catch (InterruptedException e) {
                     break;
                     break;
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
                 }
                 }
             }
             }
         }
         }

+ 2 - 6
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -22,11 +22,7 @@ import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.logger.LogType;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.Meta;
-import org.dbsyncer.parser.model.Picker;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -147,7 +143,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             extractor.setCommands(list.stream().map(t -> {
             extractor.setCommands(list.stream().map(t -> {
                 Picker picker = new Picker(t.getFieldMapping());
                 Picker picker = new Picker(t.getFieldMapping());
-                return new TableGroupCommand(picker.getSourcePrimaryKeyName(), t.getCommand());
+                return new TableGroupCommand(picker.getSourcePrimaryKeyName(connectorConfig), t.getCommand());
             }).collect(Collectors.toList()));
             }).collect(Collectors.toList()));
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
             return extractor;
             return extractor;

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -245,7 +245,7 @@ public class ParserFactory implements Parser {
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         // 获取同步字段
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
         Picker picker = new Picker(fieldMapping);
-        String pk = picker.getSourcePrimaryKeyName();
+        String pk = picker.getSourcePrimaryKeyName(sConfig);
 
 
         int pageSize = mapping.getReadNum();
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
         int batchSize = mapping.getBatchNum();

+ 7 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -1,8 +1,9 @@
 package org.dbsyncer.parser.model;
 package org.dbsyncer.parser.model;
 
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Field;
-import org.dbsyncer.parser.ParserException;
+import org.springframework.util.Assert;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -64,13 +65,16 @@ public class Picker {
         }
         }
     }
     }
 
 
-    public String getSourcePrimaryKeyName() {
+    public String getSourcePrimaryKeyName(ConnectorConfig config) {
         for (Field f : sourceFields) {
         for (Field f : sourceFields) {
             if (f.isPk()) {
             if (f.isPk()) {
                 return f.getName();
                 return f.getName();
             }
             }
         }
         }
-        throw new ParserException("主键为空");
+
+        String primaryKey = config.getPrimaryKey();
+        Assert.hasText(primaryKey, "主键为空");
+        return primaryKey;
     }
     }
 
 
     public List<Field> getTargetFields() {
     public List<Field> getTargetFields() {