浏览代码

Merge remote-tracking branch 'origin/V_1.0.0_Beta' into yjwang

yjwang 3 年之前
父节点
当前提交
c4aa6edd82

+ 5 - 1
README.md

@@ -56,7 +56,7 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
             <tr>
                 <td>File</td>
                 <td>✔</td>
-                <td>开发中</td>
+                <td></td>
                 <td>*.txt, *.unl</td>
             </tr>
             <tr>
@@ -148,6 +148,10 @@ grant change notification to 你的账号
 * [JDK - 1.8.0_40](https://www.oracle.com/java/technologies/jdk8-downloads.html)(推荐版本以上)
 * [Maven - 3.3.9](https://dlcdn.apache.org/maven/maven-3/)(推荐版本以上)
 
+## 🎨设计
+#### 架构图
+<img src="http://assets.processon.com/chart_image/5d63b0bce4b0ac2b61877037.png" />
+
 ## ⚙️手动编译
 > 先确保环境已安装JDK和Maven
 ```bash

+ 5 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -151,7 +151,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
             int batchSize = execute.length;
             for (int i = 0; i < batchSize; i++) {
                 if (execute[i] == 0) {
-                    forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                    if (config.isForceUpdate()) {
+                        forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                    } else {
+                        result.getFailData().add(data.get(i));
+                    }
                     continue;
                 }
                 result.getSuccessData().add(data.get(i));

+ 54 - 26
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -3,7 +3,7 @@ package org.dbsyncer.connector.file;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.LineIterator;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
@@ -21,9 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
+import java.io.*;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -55,7 +53,20 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
     @Override
     public boolean isAlive(FileConnectorMapper connectorMapper) {
-        return connectorMapper.getConnection().exists();
+        String fileDir = connectorMapper.getConnection();
+        boolean alive = new File(fileDir).exists();
+        if (!alive) {
+            logger.warn("can not find fileDir:{}", fileDir);
+            return false;
+        }
+        for (FileSchema fileSchema : connectorMapper.getFileSchemaList()) {
+            String filePath = connectorMapper.getFilePath(fileSchema.getFileName());
+            if (!new File(filePath).exists()) {
+                logger.warn("can not find file:{}", filePath);
+                alive = false;
+            }
+        }
+        return alive;
     }
 
     @Override
@@ -72,12 +83,12 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
     @Override
     public List<Table> getTable(FileConnectorMapper connectorMapper) {
-        return getFileSchema(connectorMapper).stream().map(fileSchema -> new Table(fileSchema.getFileName())).collect(Collectors.toList());
+        return connectorMapper.getFileSchemaList().stream().map(fileSchema -> new Table(fileSchema.getFileName())).collect(Collectors.toList());
     }
 
     @Override
     public MetaInfo getMetaInfo(FileConnectorMapper connectorMapper, String tableName) {
-        FileSchema fileSchema = getFileSchema(connectorMapper, tableName);
+        FileSchema fileSchema = connectorMapper.getFileSchema(tableName);
         return new MetaInfo().setColumn(fileSchema.getFields());
     }
 
@@ -106,7 +117,7 @@ public final class FileConnector extends AbstractConnector implements Connector<
         FileReader reader = null;
         try {
             FileConfig fileConfig = connectorMapper.getConfig();
-            FileSchema fileSchema = getFileSchema(connectorMapper, config.getCommand().get(FILE_NAME));
+            FileSchema fileSchema = connectorMapper.getFileSchema(config.getCommand().get(FILE_NAME));
             final List<Field> fields = fileSchema.getFields();
             Assert.notEmpty(fields, "The fields of file schema is empty.");
             final char separator = fileConfig.getSeparator();
@@ -137,7 +148,37 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
     @Override
     public Result writer(FileConnectorMapper connectorMapper, WriterBatchConfig config) {
-        return null;
+        List<Map> data = config.getData();
+        if (CollectionUtils.isEmpty(data)) {
+            logger.error("writer data can not be empty.");
+            throw new ConnectorException("writer data can not be empty.");
+        }
+
+        final List<Field> fields = config.getFields();
+        final String separator = new String(new char[] {connectorMapper.getConfig().getSeparator()});
+
+        Result result = new Result();
+        OutputStream output = null;
+        try {
+            final String filePath = connectorMapper.getFilePath(config.getCommand().get(FILE_NAME));
+            output = new FileOutputStream(filePath, true);
+            List<String> lines = data.stream().map(row -> {
+                List<String> array = new ArrayList<>();
+                fields.forEach(field -> {
+                    Object o = row.get(field.getName());
+                    array.add(null != o ? String.valueOf(o) : "");
+                });
+                return StringUtil.join(array.toArray(), separator);
+            }).collect(Collectors.toList());
+            IOUtils.writeLines(lines, null, output, "UTF-8");
+        } catch (Exception e) {
+            result.addFailData(data);
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(output);
+        }
+        return result;
     }
 
     @Override
@@ -157,22 +198,9 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        return Collections.EMPTY_MAP;
-    }
-
-    private FileSchema getFileSchema(FileConnectorMapper connectorMapper, String tableName) {
-        List<FileSchema> fileSchemaList = getFileSchema(connectorMapper);
-        for (FileSchema fileSchema : fileSchemaList) {
-            if (StringUtil.equals(fileSchema.getFileName(), tableName)) {
-                return fileSchema;
-            }
-        }
-        throw new ConnectorException(String.format("can not find fileSchema by tableName '%s'", tableName));
+        Map<String, String> command = new HashMap<>();
+        command.put(FILE_NAME, commandConfig.getTable().getName());
+        return command;
     }
 
-    private List<FileSchema> getFileSchema(FileConnectorMapper connectorMapper) {
-        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(connectorMapper.getConfig().getSchema(), FileSchema.class);
-        Assert.notEmpty(fileSchemas, "The schema is empty.");
-        return fileSchemas;
-    }
-}
+}

+ 52 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnectorMapper.java

@@ -1,22 +1,40 @@
 package org.dbsyncer.connector.file;
 
+import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.FileSchema;
+import org.springframework.util.Assert;
 
 import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/5/5 23:19
  */
-public final class FileConnectorMapper implements ConnectorMapper<FileConfig, File> {
+public final class FileConnectorMapper implements ConnectorMapper<FileConfig, String> {
+
     private FileConfig config;
-    private File file;
+    private List<FileSchema> fileSchemaList;
+    private Map<String, FileResolver> fileSchemaMap = new ConcurrentHashMap<>();
 
     public FileConnectorMapper(FileConfig config) {
         this.config = config;
-        file = new File(config.getFileDir());
+        fileSchemaList = JsonUtil.jsonToArray(config.getSchema(), FileSchema.class);
+        Assert.notEmpty(fileSchemaList, "The schema is empty.");
+        for (FileSchema fileSchema : fileSchemaList) {
+            final List<Field> fields = fileSchema.getFields();
+            Assert.notEmpty(fields, "The fields of file schema is empty.");
+
+            if (!fileSchemaMap.containsKey(fileSchema.getFileName())) {
+                fileSchemaMap.put(fileSchema.getFileName(), new FileResolver(fileSchema));
+            }
+        }
     }
 
     public FileConfig getConfig() {
@@ -24,12 +42,40 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, Fi
     }
 
     @Override
-    public File getConnection() {
-        return file;
+    public String getConnection() {
+        return config.getFileDir();
     }
 
     @Override
     public void close() {
+        fileSchemaMap.clear();
+    }
+
+    public List<FileSchema> getFileSchemaList() {
+        return fileSchemaList;
+    }
+
+    public FileSchema getFileSchema(String tableName) {
+        FileResolver fileResolver = fileSchemaMap.get(tableName);
+        Assert.notNull(fileResolver, String.format("can not find fileSchema by tableName '%s'", tableName));
+        return fileResolver.fileSchema;
+    }
+
+    public String getFilePath(String tableName) {
+        FileResolver fileResolver = fileSchemaMap.get(tableName);
+        Assert.notNull(fileResolver, String.format("can not find fileSchema by tableName '%s'", tableName));
+        return fileResolver.filePath;
+    }
+
+    class FileResolver {
+        FileSchema fileSchema;
+        String filePath;
 
+        public FileResolver(FileSchema fileSchema) {
+            this.fileSchema = fileSchema;
+            this.filePath = config.getFileDir().concat(fileSchema.getFileName());
+            File file = new File(filePath);
+            Assert.isTrue(file.exists(), String.format("found not file '%s'", filePath));
+        }
     }
-}
+}

+ 69 - 0
dbsyncer-connector/src/main/test/FileConnectionTest.java

@@ -0,0 +1,69 @@
+import org.apache.commons.io.IOUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/11 20:19
+ */
+public class FileConnectionTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private CountDownLatch latch;
+
+    @Test
+    public void testConnection() throws InterruptedException, IOException {
+        File f = new File("D:\\test\\abc.txt");
+        RandomAccessFile file = new RandomAccessFile(f, "rw");
+
+        int count = 10;
+        latch = new CountDownLatch(count);
+        for (int i = 0; i < count; i++) {
+            Thread t = new Thread(new WritingThread(i, file.getChannel()));
+            t.start();
+        }
+        latch.await();
+        file.close();
+        InputStream fileR = new FileInputStream(f);
+        List<String> strings = IOUtils.readLines(fileR, Charset.defaultCharset());
+        strings.forEach(line -> logger.info("{}", line));
+        fileR.close();
+    }
+
+    class WritingThread implements Runnable {
+
+        private FileChannel channel;
+
+        private int id;
+
+        public WritingThread(int id, FileChannel channel) {
+            this.channel = channel;
+            this.id = id;
+        }
+
+        @Override
+        public void run() {
+            logger.info("Thread {} is Writing", id);
+            try {
+                for (int i = 1; i <= 5; i++) {
+                    String msg = String.format("%s, %s, %s\n", Thread.currentThread().getName(), id, i);
+                    this.channel.write(ByteBuffer.wrap(msg.getBytes()));
+                }
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+            latch.countDown();
+        }
+    }
+
+}

+ 4 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -66,7 +66,7 @@ public class FileExtractor extends AbstractExtractor {
             connected = true;
 
             separator = config.getSeparator();
-            initPipeline(config.getFileDir(), config.getSchema());
+            initPipeline(config.getFileDir());
             watchService = FileSystems.getDefault().newWatchService();
             Path p = Paths.get(config.getFileDir());
             p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
@@ -89,10 +89,8 @@ public class FileExtractor extends AbstractExtractor {
         }
     }
 
-    private void initPipeline(String fileDir, String schema) throws IOException {
-        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
-        Assert.notEmpty(fileSchemas, "found not file schema.");
-        for (FileSchema fileSchema : fileSchemas) {
+    private void initPipeline(String fileDir) throws IOException {
+        for (FileSchema fileSchema : connectorMapper.getFileSchemaList()) {
             String fileName = fileSchema.getFileName();
             String file = fileDir.concat(fileName);
             Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
@@ -223,4 +221,4 @@ public class FileExtractor extends AbstractExtractor {
 
     }
 
-}
+}