AE86 4 年之前
父節點
當前提交
888daba025
共有 1 個文件被更改,包括 188 次插入57 次删除
  1. 188 57
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

+ 188 - 57
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -2,28 +2,35 @@ package org.dbsyncer.storage.support;
 
 
 import org.apache.commons.dbcp.DelegatingDatabaseMetaData;
 import org.apache.commons.dbcp.DelegatingDatabaseMetaData;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.enums.SqlBuilderEnum;
+import org.dbsyncer.connector.mysql.MysqlConnector;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.JDBCUtil;
 import org.dbsyncer.connector.util.JDBCUtil;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.AbstractStorageService;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.dao.EmptyResultDataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 import java.io.*;
 import java.io.*;
 import java.lang.reflect.Field;
 import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DatabaseMetaData;
-import java.util.HashSet;
+import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -37,16 +44,19 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
-    private JdbcTemplate jdbcTemplate;
+    private static final String PREFIX_TABLE = "dbsyncer_";
+    private static final String SHOW_TABLE = "show tables where Tables_in_%s = \"%s\"";
+    private static final String PK = ConfigConstant.CONFIG_MODEL_ID;
 
 
-    private DatabaseConfig config;
+    private Map<String, Executor> tables = new ConcurrentHashMap<>();
 
 
-    private Set<String> tables = new HashSet();
+    private Database connector = new MysqlConnector();
 
 
-    private final Object createTableLock = new Object();
+    private JdbcTemplate jdbcTemplate;
 
 
-    private static final String PREFIX_TABLE = "dbsyncer_";
-    private static final String QUERY_TABLE  = "show tables where Tables_in_%s = \"%s\"";
+    private DatabaseConfig config;
+
+    private String database;
 
 
     @PostConstruct
     @PostConstruct
     private void init() {
     private void init() {
@@ -63,51 +73,66 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         logger.info("password:{}", config.getPassword());
         logger.info("password:{}", config.getPassword());
         jdbcTemplate = DatabaseUtil.getJdbcTemplate(config);
         jdbcTemplate = DatabaseUtil.getJdbcTemplate(config);
 
 
-        // 创建配置和日志表
-        //createTableIfNotExist(StorageEnum.CONFIG.getType());
-        //createTableIfNotExist(StorageEnum.LOG.getType());
+        // 获取数据库名称
+        Connection conn = null;
+        try {
+            conn = jdbcTemplate.getDataSource().getConnection();
+            DelegatingDatabaseMetaData md = (DelegatingDatabaseMetaData) conn.getMetaData();
+            DatabaseMetaData delegate = md.getDelegate();
+            Class clazz = delegate.getClass().getSuperclass();
+            Field field = clazz.getDeclaredField("database");
+            field.setAccessible(true);
+            database = (String) field.get(delegate);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            JDBCUtil.close(conn);
+        }
+
+        // 初始化表
+        initTable();
     }
     }
 
 
     @Override
     @Override
     public List<Map> select(String table, Query query) {
     public List<Map> select(String table, Query query) {
-        createTableIfNotExist(table);
+        getExecutor(table, StorageEnum.CONFIG.getType());
 
 
         return null;
         return null;
     }
     }
 
 
     @Override
     @Override
     public void insert(String table, Map params) {
     public void insert(String table, Map params) {
-        createTableIfNotExist(table);
+        getExecutor(table, StorageEnum.CONFIG.getType());
 
 
     }
     }
 
 
     @Override
     @Override
     public void update(String table, Map params) {
     public void update(String table, Map params) {
-        createTableIfNotExist(table);
+        getExecutor(table, StorageEnum.CONFIG.getType());
 
 
     }
     }
 
 
     @Override
     @Override
     public void delete(String table, String id) {
     public void delete(String table, String id) {
-        createTableIfNotExist(table);
+        getExecutor(table, StorageEnum.CONFIG.getType());
 
 
     }
     }
 
 
     @Override
     @Override
     public void deleteAll(String table) {
     public void deleteAll(String table) {
-        createTableIfNotExist(table);
+        getExecutor(table, StorageEnum.CONFIG.getType());
 
 
     }
     }
 
 
     @Override
     @Override
     public void insertLog(String table, Map<String, Object> params) {
     public void insertLog(String table, Map<String, Object> params) {
-        createTableIfNotExist(table);
+        getExecutor(table, StorageEnum.LOG.getType());
 
 
     }
     }
 
 
     @Override
     @Override
     public void insertData(String table, List<Map> list) {
     public void insertData(String table, List<Map> list) {
-        createTableIfNotExist(table);
+        getExecutor(table, StorageEnum.DATA.getType());
 
 
     }
     }
 
 
@@ -116,52 +141,86 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         DatabaseUtil.close(jdbcTemplate);
         DatabaseUtil.close(jdbcTemplate);
     }
     }
 
 
-    private void createTableIfNotExist(String table) {
-        // 前缀标识
-        table = PREFIX_TABLE.concat(table);
-        synchronized (createTableLock) {
-            // 1、检查本地缓存
-            if (tables.contains(table)) {
-                return;
-            }
+    private Executor getExecutor(String table, String group) {
+        // 获取模板
+        Executor executor = tables.get(group);
+        Assert.notNull(executor, "未知的存储类型");
 
 
-            // 2、检查DB中是否创建
-            Connection connection = null;
-            Map<String, Object> map = null;
-            try {
-                connection = jdbcTemplate.getDataSource().getConnection();
-                DelegatingDatabaseMetaData md = (DelegatingDatabaseMetaData) connection.getMetaData();
-                DatabaseMetaData delegate = md.getDelegate();
-                String databaseName = getDatabaseName(delegate);
-                // show tables where Tables_in_dbsyncer = "dbsyncer_config"
-                String sql = String.format(QUERY_TABLE, databaseName, table);
-                logger.info(sql);
-                map = jdbcTemplate.queryForMap(sql);
-            } catch (Exception e) {
-                logger.error(e.getMessage());
-            } finally {
-                JDBCUtil.close(connection);
+        synchronized (tables) {
+            // 检查本地缓存
+            Executor e = tables.get(table);
+            if (tables.containsKey(table)) {
+                return e;
             }
             }
+            // 不存在
+            Executor newExecutor = new Executor();
+            BeanUtils.copyProperties(executor, newExecutor);
+            createTableIfNotExist(table, newExecutor);
 
 
-            // 不存在表,则创建
-            if (CollectionUtils.isEmpty(map)) {
-                createTable(table);
-            }
+            tables.putIfAbsent(table, newExecutor);
+            return newExecutor;
         }
         }
     }
     }
 
 
-    private void createTable(String table) {
-        // /dbsyncer_config.sql
-        String sql = readSql(PREFIX_TABLE.concat(table).concat(".sql"));
-        jdbcTemplate.execute(sql);
+    private void initTable() {
+        // 配置
+        List<String> configFields = Arrays.asList(
+                ConfigConstant.CONFIG_MODEL_ID,
+                ConfigConstant.CONFIG_MODEL_NAME,
+                ConfigConstant.CONFIG_MODEL_TYPE,
+                ConfigConstant.CONFIG_MODEL_CREATE_TIME,
+                ConfigConstant.CONFIG_MODEL_UPDATE_TIME,
+                ConfigConstant.CONFIG_MODEL_JSON);
+        // 日志
+        List<String> logFields = Arrays.asList(
+                ConfigConstant.CONFIG_MODEL_ID,
+                ConfigConstant.CONFIG_MODEL_NAME,
+                ConfigConstant.CONFIG_MODEL_TYPE,
+                ConfigConstant.CONFIG_MODEL_CREATE_TIME,
+                ConfigConstant.CONFIG_MODEL_JSON);
+        // 数据
+        List<String> dataFields = Arrays.asList(
+                ConfigConstant.CONFIG_MODEL_ID,
+                ConfigConstant.CONFIG_MODEL_NAME,
+                ConfigConstant.DATA_SUCCESS,
+                ConfigConstant.DATA_EVENT,
+                ConfigConstant.DATA_ERROR,
+                ConfigConstant.CONFIG_MODEL_CREATE_TIME,
+                ConfigConstant.CONFIG_MODEL_JSON);
+        tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields));
+        tables.putIfAbsent(StorageEnum.LOG.getType(), new Executor(StorageEnum.LOG, logFields));
+        tables.putIfAbsent(StorageEnum.DATA.getType(), new Executor(StorageEnum.DATA, dataFields, true));
+        // 创建表
+        tables.forEach((tableName, e) -> {
+            if (!e.isDynamicTableName()) {
+                createTableIfNotExist(tableName, e);
+            }
+        });
     }
     }
 
 
-    private String getDatabaseName(DatabaseMetaData delegate) throws NoSuchFieldException, IllegalAccessException {
-        Class clazz = delegate.getClass().getSuperclass();
-        Field field = clazz.getDeclaredField("database");
-        field.setAccessible(true);
-        Object value = field.get(delegate);
-        return (String) value;
+    private void createTableIfNotExist(String table, Executor executor) {
+        table = PREFIX_TABLE.concat(table);
+        // show tables where Tables_in_dbsyncer = "dbsyncer_config"
+        String sql = String.format(SHOW_TABLE, database, table);
+        try {
+            jdbcTemplate.queryForMap(sql);
+        } catch (EmptyResultDataAccessException e) {
+            // 不存在表
+            String type = executor.getGroup().getType();
+            String template = PREFIX_TABLE.concat(type);
+            String ddl = readSql("/".concat(template).concat(".sql"));
+            // 动态替换表名
+            ddl = executor.isDynamicTableName() ? StringUtils.replaceOnce(ddl, template, table) : ddl;
+            logger.info(ddl);
+            jdbcTemplate.execute(ddl);
+        }
+
+        List<String> fieldName = executor.getFieldName();
+        String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
+        String insert = SqlBuilderEnum.INSERT.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
+        String update = SqlBuilderEnum.UPDATE.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
+        String delete = SqlBuilderEnum.DELETE.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
+        executor.setQuery(query).setInsert(insert).setUpdate(update).setDelete(delete);
     }
     }
 
 
     private String readSql(String filePath) {
     private String readSql(String filePath) {
@@ -200,4 +259,76 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void setConfig(DatabaseConfig config) {
     public void setConfig(DatabaseConfig config) {
         this.config = config;
         this.config = config;
     }
     }
+
+    class Executor {
+        private String       query;
+        private String       insert;
+        private String       update;
+        private String       delete;
+        private StorageEnum  group;
+        private List<String> fieldName;
+        private boolean      dynamicTableName;
+
+        public Executor() {
+        }
+
+        public Executor(StorageEnum group, List<String> fieldName) {
+            this.group = group;
+            this.fieldName = fieldName;
+        }
+
+        public Executor(StorageEnum group, List<String> fieldName, boolean dynamicTableName) {
+            this.group = group;
+            this.fieldName = fieldName;
+            this.dynamicTableName = dynamicTableName;
+        }
+
+        public String getQuery() {
+            return query;
+        }
+
+        public Executor setQuery(String query) {
+            this.query = query;
+            return this;
+        }
+
+        public String getInsert() {
+            return insert;
+        }
+
+        public Executor setInsert(String insert) {
+            this.insert = insert;
+            return this;
+        }
+
+        public String getUpdate() {
+            return update;
+        }
+
+        public Executor setUpdate(String update) {
+            this.update = update;
+            return this;
+        }
+
+        public String getDelete() {
+            return delete;
+        }
+
+        public Executor setDelete(String delete) {
+            this.delete = delete;
+            return this;
+        }
+
+        public StorageEnum getGroup() {
+            return group;
+        }
+
+        public List<String> getFieldName() {
+            return fieldName;
+        }
+
+        public boolean isDynamicTableName() {
+            return dynamicTableName;
+        }
+    }
 }
 }