AE86 5 سال پیش
والد
کامیت
74a8b0f90f

+ 12 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -8,6 +8,7 @@ import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Filter;
 import org.dbsyncer.connector.config.Filter;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.FieldMapping;
@@ -122,6 +123,17 @@ public class TableGroupChecker extends AbstractChecker {
     private Table getTable(String connectorId, String tableName) {
     private Table getTable(String connectorId, String tableName) {
         MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
         MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
         Assert.notNull(metaInfo, "无法获取连接器表信息.");
         Assert.notNull(metaInfo, "无法获取连接器表信息.");
+        String connectorType = manager.getConnector(connectorId).getConfig().getConnectorType();
+        // Oralce 监听需要ROWID字段
+        if(ConnectorEnum.isOracle(connectorType)){
+            List<Field> column = metaInfo.getColumn();
+            List<Field> list = new ArrayList<>();
+            list.add(new Field("ROWID", "VARCHAR2",12));
+            list.addAll(column);
+
+            column.clear();
+            column.addAll(list);
+        }
         return new Table().setName(tableName).setColumn(metaInfo.getColumn());
         return new Table().setName(tableName).setColumn(metaInfo.getColumn());
     }
     }
 
 

+ 16 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -231,12 +231,19 @@ public abstract class AbstractDatabaseConnector implements Database {
             logger.error("writer data can not be empty.");
             logger.error("writer data can not be empty.");
             throw new ConnectorException("writer data can not be empty.");
             throw new ConnectorException("writer data can not be empty.");
         }
         }
-        List<Object> args = new ArrayList<>();
-        fields.forEach(f -> args.add(data.get(f.getName())));
-        if (!StringUtils.equals(ConnectorConstant.OPERTION_INSERT, event)) {
+
+        // Update / Delete
+        if (StringUtils.equals(ConnectorConstant.OPERTION_UPDATE, event)) {
+            // update attrs by id
+            List<Field> pkList = fields.stream().filter(f -> f.isPk()).collect(Collectors.toList());
+            fields.add(pkList.get(0));
+        } else if (StringUtils.equals(ConnectorConstant.OPERTION_DELETE, event)) {
+            // delete by id
             List<Field> pkList = fields.stream().filter(f -> f.isPk()).collect(Collectors.toList());
             List<Field> pkList = fields.stream().filter(f -> f.isPk()).collect(Collectors.toList());
+            fields.clear();
             fields.add(pkList.get(0));
             fields.add(pkList.get(0));
         }
         }
+
         int size = fields.size();
         int size = fields.size();
 
 
         DatabaseConfig cfg = (DatabaseConfig) config;
         DatabaseConfig cfg = (DatabaseConfig) config;
@@ -247,7 +254,7 @@ public abstract class AbstractDatabaseConnector implements Database {
             jdbcTemplate = getJdbcTemplate(cfg);
             jdbcTemplate = getJdbcTemplate(cfg);
 
 
             // 3、设置参数
             // 3、设置参数
-            int update = jdbcTemplate.update(sql, (ps)-> {
+            int update = jdbcTemplate.update(sql, (ps) -> {
                 Field f = null;
                 Field f = null;
                 for (int i = 0; i < size; i++) {
                 for (int i = 0; i < size; i++) {
                     f = fields.get(i);
                     f = fields.get(i);
@@ -326,7 +333,7 @@ public abstract class AbstractDatabaseConnector implements Database {
      * @param tableLabel
      * @param tableLabel
      * @return
      * @return
      */
      */
-    protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, String tableLabel){
+    protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, String tableLabel) {
         // 获取过滤SQL
         // 获取过滤SQL
         List<Filter> filter = commandConfig.getFilter();
         List<Filter> filter = commandConfig.getFilter();
         String queryFilterSql = getQueryFilterSql(filter);
         String queryFilterSql = getQueryFilterSql(filter);
@@ -337,7 +344,7 @@ public abstract class AbstractDatabaseConnector implements Database {
         String querySql = table.getName();
         String querySql = table.getName();
 
 
         // 存在条件
         // 存在条件
-        if(StringUtils.isNotBlank(queryFilterSql)){
+        if (StringUtils.isNotBlank(queryFilterSql)) {
             querySql += queryFilterSql;
             querySql += queryFilterSql;
         }
         }
         map.put(SqlBuilderEnum.QUERY.getName(), querySql);
         map.put(SqlBuilderEnum.QUERY.getName(), querySql);
@@ -354,9 +361,10 @@ public abstract class AbstractDatabaseConnector implements Database {
 
 
     /**
     /**
      * 查询语句表名和字段带上引号(默认不加)
      * 查询语句表名和字段带上引号(默认不加)
+     *
      * @return
      * @return
      */
      */
-    protected String buildSqlWithQuotation(){
+    protected String buildSqlWithQuotation() {
         return "";
         return "";
     }
     }
 
 
@@ -430,7 +438,7 @@ public abstract class AbstractDatabaseConnector implements Database {
     /**
     /**
      * 获取查询SQL
      * 获取查询SQL
      *
      *
-     * @param type {@link SqlBuilderEnum}
+     * @param type           {@link SqlBuilderEnum}
      * @param table
      * @param table
      * @param queryFilterSQL
      * @param queryFilterSQL
      * @return
      * @return

+ 4 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java

@@ -93,6 +93,10 @@ public enum ConnectorEnum {
         throw new ConnectorException(String.format("Connector type \"%s\" does not exist.", type));
         throw new ConnectorException(String.format("Connector type \"%s\" does not exist.", type));
     }
     }
 
 
+    public static boolean isOracle(String connectorType) {
+        return StringUtils.equals(ORACLE.getType(), connectorType) || StringUtils.equals(DQL_ORACLE.getType(), connectorType);
+    }
+
     public String getType() {
     public String getType() {
         return type;
         return type;
     }
     }

+ 6 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -2,6 +2,7 @@ package org.dbsyncer.listener.enums;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.listener.oracle.OracleExtractor;
 import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
@@ -20,7 +21,11 @@ public enum ListenerEnum {
     /**
     /**
      * Mysql
      * Mysql
      */
      */
-    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class);
+    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class),
+    /**
+     * Oracle
+     */
+    ORACLE(ConnectorEnum.ORACLE.getType(), OracleExtractor.class);
 
 
     private String type;
     private String type;
     private Class<?> clazz;
     private Class<?> clazz;

+ 0 - 11
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java

@@ -10,9 +10,7 @@ import org.dbsyncer.listener.oracle.dcn.RowChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Collections;
-import java.util.concurrent.TimeUnit;
 
 
 /**
 /**
  * @version 1.0.0
  * @version 1.0.0
@@ -49,7 +47,6 @@ public class OracleExtractor extends AbstractExtractor {
     }
     }
 
 
     private void onEvent(RowChangeEvent event){
     private void onEvent(RowChangeEvent event){
-        logger.info(event.toString());
         if(event.getEvent() == TableChangeDescription.TableOperation.UPDATE.getCode()){
         if(event.getEvent() == TableChangeDescription.TableOperation.UPDATE.getCode()){
             changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getData());
             changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getData());
             return;
             return;
@@ -66,12 +63,4 @@ public class OracleExtractor extends AbstractExtractor {
         }
         }
     }
     }
 
 
-    public static void main(String[] args) throws SQLException, InterruptedException {
-        DBChangeNotification client = new DBChangeNotification("ae86", "123", "jdbc:oracle:thin:@127.0.0.1:1521:xe");
-        client.addRowEventListener((event) -> System.out.println(event));
-        client.start();
-        TimeUnit.SECONDS.sleep(120);
-        client.close();
-    }
-
 }
 }

+ 10 - 9
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -36,7 +36,7 @@ public class FieldPicker {
     public Map<String, Object> getColumns(List<Object> list) {
     public Map<String, Object> getColumns(List<Object> list) {
         if (!CollectionUtils.isEmpty(list)) {
         if (!CollectionUtils.isEmpty(list)) {
             Map<String, Object> data = new HashMap<>(indexSize);
             Map<String, Object> data = new HashMap<>(indexSize);
-            int size = list.size();
+            final int size = list.size() - 1;
             index.parallelStream().forEach(node -> {
             index.parallelStream().forEach(node -> {
                 if (node.i <= size) {
                 if (node.i <= size) {
                     data.put(node.name, list.get(node.i));
                     data.put(node.name, list.get(node.i));
@@ -66,26 +66,26 @@ public class FieldPicker {
         // 或 关系(成立任意条件)
         // 或 关系(成立任意条件)
         CompareFilter filter = null;
         CompareFilter filter = null;
         Object value = null;
         Object value = null;
-        for (Filter f: or) {
+        for (Filter f : or) {
             value = row.get(f.getName());
             value = row.get(f.getName());
-            if(null == value){
+            if (null == value) {
                 continue;
                 continue;
             }
             }
             filter = FilterEnum.getCompareFilter(f.getFilter());
             filter = FilterEnum.getCompareFilter(f.getFilter());
-            if(filter.compare(String.valueOf(value), f.getValue())){
+            if (filter.compare(String.valueOf(value), f.getValue())) {
                 return true;
                 return true;
             }
             }
         }
         }
 
 
         boolean pass = false;
         boolean pass = false;
         // 并 关系(成立所有条件)
         // 并 关系(成立所有条件)
-        for (Filter f: add) {
+        for (Filter f : add) {
             value = row.get(f.getName());
             value = row.get(f.getName());
-            if(null == value){
+            if (null == value) {
                 continue;
                 continue;
             }
             }
             filter = FilterEnum.getCompareFilter(f.getFilter());
             filter = FilterEnum.getCompareFilter(f.getFilter());
-            if(!filter.compare(String.valueOf(value), f.getValue())){
+            if (!filter.compare(String.valueOf(value), f.getValue())) {
                 return false;
                 return false;
             }
             }
             pass = true;
             pass = true;
@@ -97,7 +97,8 @@ public class FieldPicker {
     private void init(List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
     private void init(List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
         // 解析过滤条件
         // 解析过滤条件
         if ((filterSwitch = !CollectionUtils.isEmpty(filter))) {
         if ((filterSwitch = !CollectionUtils.isEmpty(filter))) {
-            add = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.AND.getName())).collect(Collectors.toList());
+            add = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.AND.getName())).collect(
+                    Collectors.toList());
             or = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.OR.getName())).collect(Collectors.toList());
             or = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.OR.getName())).collect(Collectors.toList());
         }
         }
 
 
@@ -126,7 +127,7 @@ public class FieldPicker {
         // 属性
         // 属性
         String name;
         String name;
         // 索引
         // 索引
-        int i;
+        int    i;
 
 
         public Node(String name, int i) {
         public Node(String name, int i) {
             this.name = name;
             this.name = name;

+ 3 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -106,10 +106,10 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         if (null != extractor) {
         if (null != extractor) {
             extractor.clearAllListener();
             extractor.clearAllListener();
             extractor.close();
             extractor.close();
-            map.remove(metaId);
-            publishClosedEvent(metaId);
-            logger.info("关闭成功:{}", metaId);
         }
         }
+        map.remove(metaId);
+        publishClosedEvent(metaId);
+        logger.info("关闭成功:{}", metaId);
     }
     }
 
 
     @Override
     @Override
@@ -190,7 +190,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             if (changed.compareAndSet(true, false)) {
             if (changed.compareAndSet(true, false)) {
                 Meta meta = manager.getMeta(metaId);
                 Meta meta = manager.getMeta(metaId);
                 if (null != meta) {
                 if (null != meta) {
-                    logger.info("同步增量信息:{}>>{}", metaId, map);
                     meta.setMap(map);
                     meta.setMap(map);
                     manager.editMeta(meta);
                     manager.editMeta(meta);
                 }
                 }