AE86 2 роки тому
батько
коміт
cec51032f4

+ 34 - 55
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -30,11 +30,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
+import java.io.*;
 import java.sql.SQLSyntaxErrorException;
 import java.sql.Types;
 import java.util.ArrayList;
@@ -125,7 +121,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void update(StorageEnum type, String table, Map params) {
         Executor executor = getExecutor(type, table);
         String sql = executor.getUpdate();
-        List<Object> args = getParams(executor, params);
+        List<Object> args = getUpdateArgs(executor, params);
         int update = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, args.toArray()));
         Assert.isTrue(update > 0, "update failed");
     }
@@ -163,10 +159,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void insertData(StorageEnum type, String table, List<Map> list) {
         if (!CollectionUtils.isEmpty(list)) {
             Executor executor = getExecutor(type, table);
-            List<Object[]> args = list.stream().map(row -> {
-                List<Object> params = getParams(executor, row);
-                return params.toArray();
-            }).collect(Collectors.toList());
+            List<Object[]> args = list.stream().map(row -> getArgs(executor, row).toArray()).collect(Collectors.toList());
             connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(executor.getInsert(), args));
         }
     }
@@ -184,7 +177,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private int executeInsert(StorageEnum type, String table, Map params) {
         Executor executor = getExecutor(type, table);
         String sql = executor.getInsert();
-        List<Object> args = getParams(executor, params);
+        List<Object> args = getArgs(executor, params);
         int insert = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, args.toArray()));
         if (insert < 1) {
             logger.error("table:{}, params:{}");
@@ -193,10 +186,26 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         return insert;
     }
 
-    private List<Object> getParams(Executor executor, Map params) {
+    private List<Object> getArgs(Executor executor, Map params) {
         return executor.getFields().stream().map(f -> params.get(f.getLabelName())).collect(Collectors.toList());
     }
 
+    private List<Object> getUpdateArgs(Executor executor, Map params) {
+        List<Object> args = new ArrayList<>();
+        Object pk = null;
+        for (Field f : executor.getFields()) {
+            if (f.isPk()) {
+                pk = params.get(f.getLabelName());
+                continue;
+            }
+            args.add(params.get(f.getLabelName()));
+        }
+
+        Assert.notNull(pk, "The primaryKey is null.");
+        args.add(pk);
+        return args;
+    }
+
     private Executor getExecutor(StorageEnum type, String table) {
         // 获取模板
         Executor executor = tables.get(type.getType());
@@ -235,9 +244,13 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     private String buildQueryCountSql(Query query, Executor executor, List<Object> args) {
-        StringBuilder sql = new StringBuilder("SELECT COUNT(1) FROM ").append(executor.getTable());
+        StringBuilder queryCount = new StringBuilder();
+        queryCount.append("SELECT COUNT(1) FROM (");
+        StringBuilder sql = new StringBuilder("SELECT 1 FROM `").append(executor.getTable()).append("`");
         buildQuerySqlWithParams(query, args, sql);
-        return sql.toString();
+        queryCount.append(sql);
+        queryCount.append(" GROUP BY `ID`) DBSYNCER_T");
+        return queryCount.toString();
     }
 
     private void buildQuerySqlWithParams(Query query, List<Object> args, StringBuilder sql) {
@@ -270,8 +283,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             return;
         }
         tables.values().forEach(table -> {
-            String ddl = readSql(UPGRADE_SQL, true, table);
             try {
+                String ddl = readSql(UPGRADE_SQL, true, table);
                 executeSql(ddl);
                 logger.info(ddl);
             } catch (Exception e) {
@@ -290,19 +303,15 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private void initTable() throws InterruptedException {
         // 配置
         FieldBuilder builder = new FieldBuilder();
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_NAME, ConfigConstant.CONFIG_MODEL_TYPE,
-                ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_NAME, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> configFields = builder.getFields();
 
         // 日志
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME,
-                ConfigConstant.CONFIG_MODEL_JSON);
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> logFields = builder.getFields();
 
         // 数据
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID,
-                ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR,
-                ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> dataFields = builder.getFields();
 
         tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields, false, true, true));
@@ -408,34 +417,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         this.config = config;
     }
 
-    final class FieldWrapper {
-        Field field;
-        String columnName;
-        String labelName;
-
-        public FieldWrapper(Field field) {
-            this(field, field.getName());
-        }
-
-        public FieldWrapper(Field field, String labelName) {
-            this.field = field;
-            this.columnName = field.getName();
-            this.labelName = labelName;
-        }
-
-        public Field getField() {
-            return field;
-        }
-
-        public String getColumnName() {
-            return columnName;
-        }
-
-        public String getLabelName() {
-            return labelName;
-        }
-    }
-
     final class FieldBuilder {
         Map<String, Field> fieldMap;
         List<Field> fields;
@@ -453,8 +434,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
                     new Field(ConfigConstant.DATA_TARGET_TABLE_NAME, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR)
-            )
-            .map(field -> {
+            ).map(field -> {
                 field.setLabelName(field.getName());
                 // 转换列下划线
                 String labelName = UnderlineToCamelUtils.camelToUnderline(field.getName());
@@ -470,7 +450,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         public void build(String... fieldNames) {
             fields = new ArrayList<>(fieldNames.length);
             Stream.of(fieldNames).parallel().forEach(k -> {
-                if(fieldMap.containsKey(k)){
+                if (fieldMap.containsKey(k)) {
                     Field field = fieldMap.get(k);
                     fields.add(field);
                 }
@@ -490,8 +470,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         private boolean systemType;
         private boolean orderByUpdateTime;
 
-        public Executor(StorageEnum group, List<Field> fields, boolean dynamicTableName,
-                        boolean systemType, boolean orderByUpdateTime) {
+        public Executor(StorageEnum group, List<Field> fields, boolean dynamicTableName, boolean systemType, boolean orderByUpdateTime) {
             this.group = group;
             this.fields = fields;
             this.dynamicTableName = dynamicTableName;