AE86 1 gadu atpakaļ
vecāks
revīzija
be11697cbc

+ 6 - 26
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java

@@ -6,11 +6,7 @@ import org.dbsyncer.biz.checker.ConnectorConfigChecker;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.enums.ConnectorEnum;
-import org.dbsyncer.connector.model.Table;
-import org.dbsyncer.parser.ParserComponent;
 import org.dbsyncer.parser.ProfileComponent;
-import org.dbsyncer.parser.LogService;
-import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.sdk.model.ConnectorConfig;
@@ -22,7 +18,6 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,15 +30,9 @@ public class ConnectorChecker extends AbstractChecker {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Resource
-    private ParserComponent parserComponent;
-
     @Resource
     private ProfileComponent profileComponent;
 
-    @Resource
-    private LogService logService;
-
     @Resource
     private ConnectorFactory connectorFactory;
 
@@ -70,7 +59,8 @@ public class ConnectorChecker extends AbstractChecker {
         checker.modify(config, params);
 
         // 获取表
-        setTable(connector);
+        ConnectorMapper mapper = connectorFactory.connect(connector.getConfig());
+        connector.setTable(connectorFactory.getTable(mapper));
 
         // 修改基本配置
         this.modifyConfigModel(connector, params);
@@ -85,19 +75,21 @@ public class ConnectorChecker extends AbstractChecker {
         String id = params.get(ConfigConstant.CONFIG_MODEL_ID);
         Connector connector = profileComponent.getConnector(id);
         Assert.notNull(connector, "Can not find connector.");
+        ConnectorConfig config = connector.getConfig();
+        connectorFactory.disconnect(config);
 
         // 修改基本配置
         this.modifyConfigModel(connector, params);
 
         // 配置连接器配置
-        ConnectorConfig config = connector.getConfig();
         String type = StringUtil.toLowerCaseFirstOne(config.getConnectorType()).concat("ConfigChecker");
         ConnectorConfigChecker checker = map.get(type);
         Assert.notNull(checker, "Checker can not be null.");
         checker.modify(config, params);
 
         // 获取表
-        setTable(connector);
+        ConnectorMapper mapper = connectorFactory.connect(config);
+        connector.setTable(connectorFactory.getTable(mapper));
 
         return connector;
     }
@@ -113,16 +105,4 @@ public class ConnectorChecker extends AbstractChecker {
         }
     }
 
-    private void setTable(Connector connector) {
-        boolean isAlive = connectorFactory.isAlive(connector.getConfig());
-        if (!isAlive) {
-            logService.log(LogType.ConnectorLog.FAILED);
-        }
-        Assert.isTrue(isAlive, "无法连接.");
-        // 获取表信息
-        ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
-        List<Table> table = parserComponent.getTable(connectorMapper);
-        connector.setTable(table);
-    }
-
 }

+ 10 - 20
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -7,9 +7,9 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogType;
+import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
@@ -98,9 +98,11 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
         }
 
         Connector connector = profileComponent.getConnector(id);
-        log(LogType.ConnectorLog.DELETE, connector);
-
-        profileComponent.removeConfigModel(id);
+        if (connector != null) {
+            connectorFactory.disconnect(connector.getConfig());
+            log(LogType.ConnectorLog.DELETE, connector);
+            profileComponent.removeConfigModel(id);
+        }
         return "删除连接器成功!";
     }
 
@@ -138,7 +140,7 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
         // 更新连接器状态
         Set<String> exist = new HashSet<>();
         list.forEach(c -> {
-            health.put(c.getId(), isAliveConnectorConfig(c.getConfig()));
+            health.put(c.getId(), isAlive(c.getConfig()));
             exist.add(c.getId());
         });
 
@@ -160,26 +162,14 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
         return health.containsKey(id) && health.get(id);
     }
 
-    private boolean isAliveConnectorConfig(ConnectorConfig config) {
-        boolean alive = false;
+    private boolean isAlive(ConnectorConfig config) {
         try {
-            alive = connectorFactory.isAlive(config);
+            return connectorFactory.isAlive(config);
         } catch (Exception e) {
             LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
             logService.log(logType, "%s%s", logType.getName(), e.getMessage());
+            return false;
         }
-        // 断线重连
-        if (!alive) {
-            try {
-                alive = connectorFactory.refresh(config);
-            } catch (Exception e) {
-                logger.error(e.getMessage());
-            }
-            if (alive) {
-                logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
-            }
-        }
-        return alive;
     }
 
 }

+ 4 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -4,7 +4,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.dbsyncer.biz.DataSyncService;
 import org.dbsyncer.biz.vo.BinlogColumnVo;
 import org.dbsyncer.biz.vo.MessageVo;
-import org.dbsyncer.listener.event.RowChangedEvent;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.DateFormatUtil;
@@ -12,8 +11,9 @@ import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.model.Field;
-import org.dbsyncer.parser.ParserComponent;
+import org.dbsyncer.listener.event.RowChangedEvent;
 import org.dbsyncer.parser.ProfileComponent;
+import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
@@ -53,7 +53,7 @@ public class DataSyncServiceImpl implements DataSyncService {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Resource
-    private ParserComponent parserComponent;
+    private BufferActuatorRouter bufferActuatorRouter;
 
     @Resource
     private ProfileComponent profileComponent;
@@ -164,7 +164,7 @@ public class DataSyncServiceImpl implements DataSyncService {
             // 转换为源字段
             final Picker picker = new Picker(tableGroup.getFieldMapping());
             changedEvent.setChangedRow(picker.pickSourceData(binlogData));
-            parserComponent.execute(tableGroupId, changedEvent);
+            bufferActuatorRouter.execute(metaId, tableGroupId, changedEvent);
             storageService.remove(StorageEnum.DATA, metaId, messageId);
             // 更新失败数
             Meta meta = profileComponent.getMeta(metaId);

+ 9 - 14
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -48,12 +48,14 @@ public class ConnectorFactory implements DisposableBean {
         if (!connectorCache.containsKey(cacheKey)) {
             synchronized (connectorCache) {
                 if (!connectorCache.containsKey(cacheKey)) {
-                    connectorCache.putIfAbsent(cacheKey, connector.connect(config));
+                    ConnectorMapper mapper = connector.connect(config);
+                    Assert.isTrue(connector.isAlive(mapper), "连接配置异常");
+                    connectorCache.putIfAbsent(cacheKey, mapper);
                 }
             }
         }
-        ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
         try {
+            ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
             ConnectorMapper clone = (ConnectorMapper) connectorMapper.clone();
             clone.setConfig(config);
             return clone;
@@ -63,21 +65,20 @@ public class ConnectorFactory implements DisposableBean {
     }
 
     /**
-     * 刷新连接配置
+     * 断开连接
      *
      * @param config
      * @return
      */
-    public boolean refresh(ConnectorConfig config) {
+    public void disconnect(ConnectorConfig config) {
         Assert.notNull(config, "ConnectorConfig can not be null.");
         Connector connector = getConnector(config.getConnectorType());
         String cacheKey = connector.getConnectorMapperCacheKey(config);
-        if (connectorCache.containsKey(cacheKey)) {
-            disconnect(connectorCache.get(cacheKey));
+        ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
+        if (connectorMapper != null) {
+            disconnect(connectorMapper);
             connectorCache.remove(cacheKey);
         }
-        connect(config);
-        return connector.isAlive(connectorCache.get(cacheKey));
     }
 
     /**
@@ -184,12 +185,6 @@ public class ConnectorFactory implements DisposableBean {
         return getConnector(connectorConfig.getConnectorType());
     }
 
-    /**
-     * 获取连接器
-     *
-     * @param connectorType
-     * @return
-     */
     public Connector getConnector(String connectorType) {
         return ConnectorEnum.getConnectorEnum(connectorType).getConnector();
     }

+ 1 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -59,12 +59,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     @Override
     public ConnectorMapper connect(DatabaseConfig config) {
-        try {
-            return new DatabaseConnectorMapper(config);
-        } catch (Exception e) {
-            logger.error("Failed to connect:{}, message:{}", config.getUrl(), e.getMessage());
-        }
-        throw new ConnectorException(String.format("Failed to connect:%s", config.getUrl()));
+        return new DatabaseConnectorMapper(config);
     }
 
     @Override

+ 1 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -29,11 +29,7 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
 
     @Override
     public ConnectorMapper connect(KafkaConfig config) {
-        try {
-            return new KafkaConnectorMapper(config);
-        } catch (Exception e) {
-            throw new ConnectorException("无法连接, 请检查配置:" + e.getMessage());
-        }
+        return new KafkaConnectorMapper(config);
     }
 
     @Override

+ 0 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/LogType.java

@@ -79,7 +79,6 @@ public interface LogType {
         UPDATE("21", "修改"),
         DELETE("22", "删除"),
         FAILED("23", "连接失败"),
-        RECONNECT_SUCCESS("24", "重连成功"),
         COPY("25", "复制");
 
         private String type;

+ 1 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserComponent.java

@@ -1,17 +1,14 @@
 package org.dbsyncer.parser;
 
-import org.dbsyncer.listener.ChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.connector.model.MetaInfo;
-import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.listener.ChangedEvent;
 import org.dbsyncer.parser.model.BatchWriter;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.Task;
-import org.dbsyncer.sdk.spi.ConnectorMapper;
 import org.dbsyncer.sdk.spi.ConvertContext;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -26,14 +23,6 @@ import java.util.concurrent.Executor;
  */
 public interface ParserComponent {
 
-    /**
-     * 获取连接器表
-     *
-     * @param config
-     * @return
-     */
-    List<Table> getTable(ConnectorMapper config);
-
     /**
      * 获取表元信息
      *

+ 0 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -75,11 +75,6 @@ public class ParserComponentImpl implements ParserComponent {
     @Resource
     private BufferActuator generalBufferActuator;
 
-    @Override
-    public List<Table> getTable(ConnectorMapper config) {
-        return connectorFactory.getTable(config);
-    }
-
     @Override
     public MetaInfo getMetaInfo(String connectorId, String tableName) {
         Connector connector = profileComponent.getConnector(connectorId);

+ 0 - 1
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/openapi/OpenApiController.java

@@ -13,7 +13,6 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.core.DefaultParameterNameDiscoverer;
 import org.springframework.core.ParameterNameDiscoverer;
 import org.springframework.stereotype.Controller;
-import org.springframework.ui.ModelMap;
 import org.springframework.util.ObjectUtils;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;