AE86 4 лет назад
Родитель
Сommit
a4ebe55e02

+ 2 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -90,14 +90,14 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     @Override
     public void insert(String collectionId, Map params) throws IOException {
         createShardIfNotExist(collectionId);
-        Document doc = ParamsUtil.convertParams2Doc(params);
+        Document doc = ParamsUtil.convertConfig2Doc(params);
         map.get(collectionId).insert(doc);
     }
 
     @Override
     public void update(String collectionId, Map params) throws IOException {
         createShardIfNotExist(collectionId);
-        Document doc = ParamsUtil.convertParams2Doc(params);
+        Document doc = ParamsUtil.convertConfig2Doc(params);
         IndexableField field = doc.getField(ConfigConstant.CONFIG_MODEL_ID);
         map.get(collectionId).update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), doc);
     }

+ 82 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -1,10 +1,12 @@
 package org.dbsyncer.storage.support;
 
+import org.apache.commons.dbcp.DelegatingDatabaseMetaData;
 import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.util.DatabaseUtil;
+import org.dbsyncer.connector.util.JDBCUtil;
 import org.dbsyncer.storage.AbstractStorageService;
-import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -14,6 +16,10 @@ import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.io.*;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +45,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     private final Object createTableLock = new Object();
 
+    private static final String PREFIX_TABLE = "dbsyncer_";
+    private static final String QUERY_TABLE  = "show tables where Tables_in_%s = \"%s\"";
+
     @PostConstruct
     private void init() {
         config = null == config ? new DatabaseConfig() : config;
@@ -55,8 +64,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         jdbcTemplate = DatabaseUtil.getJdbcTemplate(config);
 
         // 创建配置和日志表
-        createTableIfNotExist(StorageEnum.CONFIG.getType());
-        createTableIfNotExist(StorageEnum.LOG.getType());
+        //createTableIfNotExist(StorageEnum.CONFIG.getType());
+        //createTableIfNotExist(StorageEnum.LOG.getType());
     }
 
     @Override
@@ -108,6 +117,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     private void createTableIfNotExist(String table) {
+        // 前缀标识
+        table = PREFIX_TABLE.concat(table);
         synchronized (createTableLock) {
             // 1、检查本地缓存
             if (tables.contains(table)) {
@@ -115,7 +126,74 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             }
 
             // 2、检查DB中是否创建
-            // show tables;
+            Connection connection = null;
+            Map<String, Object> map = null;
+            try {
+                connection = jdbcTemplate.getDataSource().getConnection();
+                DelegatingDatabaseMetaData md = (DelegatingDatabaseMetaData) connection.getMetaData();
+                DatabaseMetaData delegate = md.getDelegate();
+                String databaseName = getDatabaseName(delegate);
+                // show tables where Tables_in_dbsyncer = "dbsyncer_config"
+                String sql = String.format(QUERY_TABLE, databaseName, table);
+                logger.info(sql);
+                map = jdbcTemplate.queryForMap(sql);
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            } finally {
+                JDBCUtil.close(connection);
+            }
+
+            // 不存在表,则创建
+            if (CollectionUtils.isEmpty(map)) {
+                createTable(table);
+            }
+        }
+    }
+
+    private void createTable(String table) {
+        // /dbsyncer_config.sql
+        String sql = readSql(PREFIX_TABLE.concat(table).concat(".sql"));
+        jdbcTemplate.execute(sql);
+    }
+
+    private String getDatabaseName(DatabaseMetaData delegate) throws NoSuchFieldException, IllegalAccessException {
+        Class clazz = delegate.getClass().getSuperclass();
+        Field field = clazz.getDeclaredField("database");
+        field.setAccessible(true);
+        Object value = field.get(delegate);
+        return (String) value;
+    }
+
+    private String readSql(String filePath) {
+        StringBuilder res = new StringBuilder();
+        InputStream in = null;
+        InputStreamReader isr = null;
+        BufferedReader bf = null;
+        try {
+            in = this.getClass().getResourceAsStream(filePath);
+            isr = new InputStreamReader(in, "UTF-8");
+            bf = new BufferedReader(isr);
+            String newLine;
+            while ((newLine = bf.readLine()) != null) {
+                res.append(newLine);
+            }
+        } catch (IOException e) {
+            logger.error("failed read file:{}", filePath);
+        } finally {
+            close(bf);
+            close(isr);
+            close(in);
+        }
+        return res.toString();
+    }
+
+    private void close(Closeable closeable) {
+        if (null != closeable) {
+            try {
+                closeable.close();
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
         }
     }
 

+ 2 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/ParamsUtil.java

@@ -43,7 +43,7 @@ import java.util.Map;
 public abstract class ParamsUtil {
     private ParamsUtil(){}
 
-    public static Document convertParams2Doc(Map params) {
+    public static Document convertConfig2Doc(Map params) {
         Assert.notNull(params, "Params can not be null.");
         Document doc = new Document();
         String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
@@ -56,6 +56,7 @@ public abstract class ParamsUtil {
         doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
         doc.add(new StringField(ConfigConstant.CONFIG_MODEL_TYPE, type, Field.Store.YES));
         doc.add(new TextField(ConfigConstant.CONFIG_MODEL_NAME, name, Field.Store.YES));
+        // 配置信息
         doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, json));
         // 创建时间(不需要存储)
         doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));

+ 9 - 0
dbsyncer-storage/src/main/resources/dbsyncer_config.sql

@@ -0,0 +1,9 @@
+CREATE TABLE `dbsyncer_config` (
+  `ID` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
+  `NAME` varchar(50) COLLATE utf8_bin NOT NULL COMMENT '名称',
+  `TYPE` varchar(24) COLLATE utf8_bin NOT NULL COMMENT 'connector、mapping、tableGroup、meta、config',
+  `CREATE_TIME` datetime NOT NULL COMMENT '创建时间',
+  `UPDATE_TIME` datetime NOT NULL COMMENT '修改时间',
+  `JSON` text COLLATE utf8_bin NOT NULL COMMENT '配置信息',
+  PRIMARY KEY (`ID`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='配置信息表';

+ 10 - 0
dbsyncer-storage/src/main/resources/dbsyncer_data.sql

@@ -0,0 +1,10 @@
+CREATE TABLE `dbsyncer_data` (
+  `ID` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
+  `NAME` varchar(50) COLLATE utf8_bin NOT NULL COMMENT '名称',
+  `SUCCESS` varchar(6) COLLATE utf8_bin NOT NULL COMMENT '是否成功:true/false',
+  `EVENT` varchar(255) COLLATE utf8_bin NOT NULL COMMENT '事件',
+  `ERROR` varchar(1024) COLLATE utf8_bin DEFAULT NULL COMMENT '异常信息',
+  `CREATE_TIME` datetime NOT NULL COMMENT '创建时间',
+  `JSON` text COLLATE utf8_bin NOT NULL COMMENT '同步数据',
+  PRIMARY KEY (`ID`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='同步数据表';

+ 8 - 0
dbsyncer-storage/src/main/resources/dbsyncer_log.sql

@@ -0,0 +1,8 @@
+CREATE TABLE `dbsyncer_log` (
+  `ID` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
+  `NAME` varchar(50) COLLATE utf8_bin NOT NULL COMMENT '名称',
+  `TYPE` varchar(24) COLLATE utf8_bin NOT NULL COMMENT '连接器、映射配置、表映射、元信息、系统日志',
+  `CREATE_TIME` datetime NOT NULL COMMENT '创建时间',
+  `JSON` text COLLATE utf8_bin NOT NULL COMMENT '日志信息',
+  PRIMARY KEY (`ID`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='操作日志表';