Bladeren bron

!67 merge
Merge pull request !67 from AE86/V_1.0.0_Beta

AE86 2 jaren geleden
bovenliggende
commit
fba3b3c132
39 gewijzigde bestanden met toevoegingen van 1023 en 1213 verwijderingen
  1. 7 2
      README.md
  2. 79 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/BinlogRecorderConfig.java
  3. 66 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java
  4. 1 11
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java
  5. 4 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BlobSetter.java
  6. 8 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/ClobSetter.java
  7. 1 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/NClobSetter.java
  8. 30 10
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/PreparedFieldMapper.java
  9. 11 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/SmallintSetter.java
  10. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  11. 45 1
      dbsyncer-connector/src/main/test/SqlServerConnectionTest.java
  12. 10 10
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  13. 98 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java
  14. 67 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/StringToTimestampHandler.java
  15. 4 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ConvertEnum.java
  16. 7 10
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  17. 15 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  18. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java
  19. 11 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java
  20. 3 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java
  21. 14 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/ParserStrategy.java
  22. 6 87
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java
  23. 13 19
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java
  24. 95 242
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java
  25. 1 2
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogColumnValue.java
  26. 0 361
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java
  27. 17 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogRecorder.java
  28. 0 63
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogPipeline.java
  29. 0 53
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogReader.java
  30. 0 45
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogWriter.java
  31. 24 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/BinlogConstant.java
  32. 4 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/IndexFieldResolverEnum.java
  33. 5 5
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java
  34. 239 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java
  35. 21 2
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java
  36. 0 224
      dbsyncer-storage/src/main/test/BinlogMessageTest.java
  37. 2 2
      dbsyncer-storage/src/main/test/LuceneFactoryTest.java
  38. 92 42
      dbsyncer-storage/src/main/test/ShardBinlogTest.java
  39. 19 5
      dbsyncer-web/src/main/resources/application.properties

+ 7 - 2
README.md

@@ -27,7 +27,7 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
                 <td>Oracle</td>
                 <td>Oracle</td>
                 <td>✔</td>
                 <td>✔</td>
                 <td>✔</td>
                 <td>✔</td>
-                <td>10g以上</td>
+                <td>Oracle 10gR2 -11g</td>
             </tr>
             </tr>
             <tr>
             <tr>
                 <td>SqlServer</td>
                 <td>SqlServer</td>
@@ -100,10 +100,15 @@ replicate-do-db=test
 
 
 ##### Oracle
 ##### Oracle
 * CDN注册订阅。监听增删改事件,得到rowid,根据rowid执行SQL查询,得到变化数据。
 * CDN注册订阅。监听增删改事件,得到rowid,根据rowid执行SQL查询,得到变化数据。
-> 授予账号监听权限, 同时要求目标源表必须定义一个长度为18的varchar字段,通过接收rowid值实现增删改操作。
+> 1、授予账号监听权限, 同时要求目标源表必须定义一个长度为18的varchar字段,通过接收rowid值实现增删改操作。
 ```roomsql
 ```roomsql
 grant change notification to 你的账号
 grant change notification to 你的账号
 ```
 ```
+> 2、账号必须是监听表的OWNER
+```roomsql
+SELECT OBJECT_ID, OBJECT_NAME, OWNER FROM ALL_OBJECTS WHERE OBJECT_TYPE = 'TABLE' AND OWNER='你的账号';
+```
+![DCN账号](https://images.gitee.com/uploads/images/2022/0717/001127_fb4049b6_376718.png "DCN账号.png")
 
 
 ##### SqlServer
 ##### SqlServer
 * SQL Server 2008提供了内建的方法变更数据捕获(Change Data Capture 即CDC)以实现异步跟踪用户表的数据修改。
 * SQL Server 2008提供了内建的方法变更数据捕获(Change Data Capture 即CDC)以实现异步跟踪用户表的数据修改。

+ 79 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/BinlogRecorderConfig.java

@@ -0,0 +1,79 @@
+package org.dbsyncer.common.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/14 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.storage.binlog.recorder")
+public class BinlogRecorderConfig {
+
+    /**
+     * 批量同步数
+     */
+    private int batchCount = 1000;
+
+    /**
+     * 最长任务处理耗时(秒)
+     */
+    private int maxProcessingSeconds = 120;
+
+    /**
+     * 工作线任务队列
+     */
+    private int queueCapacity = 10000;
+
+    /**
+     * 写磁盘间隔(毫秒)
+     */
+    private int writerPeriodMillisecond = 500;
+
+    /**
+     * 读磁盘间隔(毫秒)
+     */
+    private int readerPeriodMillisecond = 2000;
+
+    public int getBatchCount() {
+        return batchCount;
+    }
+
+    public void setBatchCount(int batchCount) {
+        this.batchCount = batchCount;
+    }
+
+    public int getMaxProcessingSeconds() {
+        return maxProcessingSeconds;
+    }
+
+    public void setMaxProcessingSeconds(int maxProcessingSeconds) {
+        this.maxProcessingSeconds = maxProcessingSeconds;
+    }
+
+    public int getQueueCapacity() {
+        return queueCapacity;
+    }
+
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
+    public int getWriterPeriodMillisecond() {
+        return writerPeriodMillisecond;
+    }
+
+    public void setWriterPeriodMillisecond(int writerPeriodMillisecond) {
+        this.writerPeriodMillisecond = writerPeriodMillisecond;
+    }
+
+    public int getReaderPeriodMillisecond() {
+        return readerPeriodMillisecond;
+    }
+
+    public void setReaderPeriodMillisecond(int readerPeriodMillisecond) {
+        this.readerPeriodMillisecond = readerPeriodMillisecond;
+    }
+}

+ 66 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java

@@ -0,0 +1,66 @@
+package org.dbsyncer.common.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/14 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.flush.buffer.actuator")
+public class BufferActuatorConfig {
+
+    /**
+     * 写批量数
+     */
+    private int writerBatchCount = 100;
+
+    /**
+     * 批量同步数
+     */
+    private int batchCount = 2000;
+
+    /**
+     * 工作线任务队列
+     */
+    private int queueCapacity = 5_0000;
+
+    /**
+     * 同步间隔(毫秒)
+     */
+    private int periodMillisecond = 300;
+
+    public int getWriterBatchCount() {
+        return writerBatchCount;
+    }
+
+    public void setWriterBatchCount(int writerBatchCount) {
+        this.writerBatchCount = writerBatchCount;
+    }
+
+    public int getBatchCount() {
+        return batchCount;
+    }
+
+    public void setBatchCount(int batchCount) {
+        this.batchCount = batchCount;
+    }
+
+    public int getQueueCapacity() {
+        return queueCapacity;
+    }
+
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
+    public int getPeriodMillisecond() {
+        return periodMillisecond;
+    }
+
+    public void setPeriodMillisecond(int periodMillisecond) {
+        this.periodMillisecond = periodMillisecond;
+    }
+}

+ 1 - 11
dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java

@@ -55,15 +55,6 @@ public abstract class DateFormatUtil {
             .appendText(ChronoField.ERA, TextStyle.SHORT)
             .appendText(ChronoField.ERA, TextStyle.SHORT)
             .optionalEnd()
             .optionalEnd()
             .toFormatter();
             .toFormatter();
-    private static final DateTimeFormatter TS_FORMAT = new DateTimeFormatterBuilder()
-            .append(NON_ISO_LOCAL_DATE)
-            .appendLiteral(' ')
-            .append(DateTimeFormatter.ISO_LOCAL_TIME)
-            .optionalStart()
-            .appendLiteral(" ")
-            .appendText(ChronoField.ERA, TextStyle.SHORT)
-            .optionalEnd()
-            .toFormatter();
 
 
     public static String getCurrentTime() {
     public static String getCurrentTime() {
         return LocalDateTime.now().format(TIME_FORMATTER);
         return LocalDateTime.now().format(TIME_FORMATTER);
@@ -86,9 +77,8 @@ public abstract class DateFormatUtil {
     }
     }
 
 
     public static Timestamp stringToTimestamp(String s) {
     public static Timestamp stringToTimestamp(String s) {
-        return Timestamp.valueOf(LocalDateTime.from(TS_FORMAT.parse(s)));
+        return Timestamp.valueOf(LocalDateTime.from(CHINESE_STANDARD_TIME_FORMATTER.parse(s)));
     }
     }
-
     public static OffsetTime timeWithTimeZone(String s) {
     public static OffsetTime timeWithTimeZone(String s) {
         return OffsetTime.parse(s, TIME_TZ_FORMAT).withOffsetSameInstant(ZoneOffset.UTC);
         return OffsetTime.parse(s, TIME_TZ_FORMAT).withOffsetSameInstant(ZoneOffset.UTC);
     }
     }

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BlobSetter.java

@@ -16,13 +16,15 @@ public class BlobSetter extends AbstractSetter<Blob> {
 
 
     @Override
     @Override
     protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
     protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
-        // 存放jpg等文件
         if (val instanceof Blob) {
         if (val instanceof Blob) {
             Blob blob = (Blob) val;
             Blob blob = (Blob) val;
             ps.setBlob(i, blob);
             ps.setBlob(i, blob);
             return;
             return;
         }
         }
+        if (val instanceof byte[]) {
+            ps.setBlob(i, mapper.getBlob((byte[]) val));
+            return;
+        }
         throw new ConnectorException(String.format("BlobSetter can not find type [%s], val [%s]", type, val));
         throw new ConnectorException(String.format("BlobSetter can not find type [%s], val [%s]", type, val));
     }
     }
-
 }
 }

+ 8 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/ClobSetter.java

@@ -15,10 +15,14 @@ public class ClobSetter extends AbstractSetter<Clob> {
     }
     }
 
 
     @Override
     @Override
-    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
-        if(val instanceof Clob) {
-            Clob clob = (Clob) val;
-            ps.setClob(i, clob);
+    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val)
+            throws SQLException {
+        if (val instanceof Clob) {
+            ps.setClob(i, (Clob) val);
+            return;
+        }
+        if (val instanceof byte[]) {
+            ps.setClob(i, mapper.getClob((byte[]) val));
             return;
             return;
         }
         }
         throw new ConnectorException(String.format("ClobSetter can not find type [%s], val [%s]", type, val));
         throw new ConnectorException(String.format("ClobSetter can not find type [%s], val [%s]", type, val));

+ 1 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/NClobSetter.java

@@ -17,9 +17,7 @@ public class NClobSetter extends AbstractSetter<NClob> {
     @Override
     @Override
     protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
     protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
         if (val instanceof byte[]) {
         if (val instanceof byte[]) {
-            byte[] bytes = (byte[]) val;
-            NClob nClob = mapper.getNClob(bytes);
-            ps.setNClob(i, nClob);
+            ps.setNClob(i, mapper.getNClob((byte[]) val));
             return;
             return;
         }
         }
         throw new ConnectorException(String.format("NClobSetter can not find type [%s], val [%s]", type, val));
         throw new ConnectorException(String.format("NClobSetter can not find type [%s], val [%s]", type, val));

+ 30 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/PreparedFieldMapper.java

@@ -1,26 +1,46 @@
 package org.dbsyncer.connector.database.setter;
 package org.dbsyncer.connector.database.setter;
 
 
 import oracle.jdbc.OracleConnection;
 import oracle.jdbc.OracleConnection;
-import oracle.sql.NCLOB;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
 
 
-import java.sql.Connection;
-import java.sql.NClob;
-import java.sql.SQLException;
+import java.nio.charset.Charset;
+import java.sql.*;
 
 
 public class PreparedFieldMapper {
 public class PreparedFieldMapper {
 
 
-    private Connection connection;
+    private SimpleConnection connection;
 
 
     public PreparedFieldMapper(Connection connection) {
     public PreparedFieldMapper(Connection connection) {
-        this.connection = connection;
+        this.connection = (SimpleConnection) connection;
     }
     }
 
 
     public NClob getNClob(byte[] bytes) throws SQLException {
     public NClob getNClob(byte[] bytes) throws SQLException {
-        if (connection instanceof OracleConnection) {
-            OracleConnection conn = (OracleConnection) connection;
-            return new NCLOB(conn, bytes);
+        if (connection.getConnection() instanceof OracleConnection) {
+            OracleConnection conn = (OracleConnection) connection.getConnection();
+            NClob nClob = conn.createNClob();
+            nClob.setString(1, new String(bytes, Charset.defaultCharset()));
+            return nClob;
         }
         }
         return connection.createNClob();
         return connection.createNClob();
     }
     }
 
 
-}
+    public Blob getBlob(byte[] bytes) throws SQLException {
+        if (connection.getConnection() instanceof OracleConnection) {
+            OracleConnection conn = (OracleConnection) connection.getConnection();
+            Blob blob = conn.createBlob();
+            blob.setBytes(1, bytes);
+            return blob;
+        }
+        return connection.createBlob();
+    }
+
+    public Clob getClob(byte[] bytes) throws SQLException {
+        if (connection.getConnection() instanceof OracleConnection) {
+            OracleConnection conn = (OracleConnection) connection.getConnection();
+            Clob clob = conn.createClob();
+            clob.setString(1, new String(bytes, Charset.defaultCharset()));
+            return clob;
+        }
+        return connection.createClob();
+    }
+}

+ 11 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/SmallintSetter.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database.setter;
 package org.dbsyncer.connector.database.setter;
 
 
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.AbstractSetter;
 import org.dbsyncer.connector.database.AbstractSetter;
 
 
 import java.sql.PreparedStatement;
 import java.sql.PreparedStatement;
@@ -12,4 +13,14 @@ public class SmallintSetter extends AbstractSetter<Integer> {
         ps.setInt(i, val);
         ps.setInt(i, val);
     }
     }
 
 
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val)
+            throws SQLException {
+        if(val instanceof Short){
+            Short s = (Short) val;
+            ps.setShort(i, s);
+            return;
+        }
+        throw new ConnectorException(String.format("SmallintSetter can not find type [%s], val [%s]", type, val));
+    }
 }
 }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -16,7 +16,7 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     @Override
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
         DatabaseConfig config = connectorMapper.getConfig();
         DatabaseConfig config = connectorMapper.getConfig();
-        return super.getTable(connectorMapper, String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema()));
+        return super.getTable(connectorMapper, String.format("select name from sys.tables where schema_id = schema_id('%s') and is_ms_shipped = 0", config.getSchema()));
     }
     }
 
 
     @Override
     @Override
@@ -39,6 +39,6 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
         String table = commandConfig.getTable().getName();
         String table = commandConfig.getTable().getName();
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
         // 从存储过程查询(定时更新总数,可能存在误差)
         // 从存储过程查询(定时更新总数,可能存在误差)
-        return String.format("SELECT ROWS FROM SYSINDEXES WHERE ID = OBJECT_ID('%s.%s') AND INDID IN (0, 1)", cfg.getSchema(), table);
+        return String.format("select rows from sysindexes where id = object_id('%s.%s') and indid in (0, 1)", cfg.getSchema(), table);
     }
     }
 }
 }

+ 45 - 1
dbsyncer-connector/src/main/test/SqlServerConnectionTest.java

@@ -1,9 +1,16 @@
+import oracle.jdbc.OracleConnection;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
 import org.junit.Test;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 
 
+import java.nio.charset.Charset;
+import java.sql.Clob;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
 import java.util.concurrent.*;
 import java.util.concurrent.*;
 
 
@@ -16,6 +23,43 @@ public class SqlServerConnectionTest {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
+    @Test
+    public void testByte() {
+        DatabaseConfig config = new DatabaseConfig();
+        config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:XE");
+        config.setUsername("ae86");
+        config.setPassword("123");
+        config.setDriverClassName("oracle.jdbc.OracleDriver");
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config);
+
+        String executeSql="UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
+        int[] execute = connectorMapper.execute(databaseTemplate ->
+                databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
+                    @Override
+                    public void setValues(PreparedStatement ps, int i) {
+                        try {
+                            SimpleConnection connection = (SimpleConnection) databaseTemplate.getConnection();
+                            OracleConnection conn = (OracleConnection) connection.getConnection();
+                            Clob clob = conn.createClob();
+                            clob.setString(1, new String("中文888".getBytes(Charset.defaultCharset())));
+
+                            ps.setString(1, "hello888");
+                            ps.setClob(2, clob);
+                            ps.setInt(3, 2);
+                        } catch (SQLException e) {
+                            e.printStackTrace();
+                        }
+                    }
+
+                    @Override
+                    public int getBatchSize() {
+                        return 1;
+                    }
+                })
+        );
+        logger.info("execute:{}", execute);
+    }
+
     @Test
     @Test
     public void testConnection() throws InterruptedException {
     public void testConnection() throws InterruptedException {
         DatabaseConfig config = new DatabaseConfig();
         DatabaseConfig config = new DatabaseConfig();
@@ -65,4 +109,4 @@ public class SqlServerConnectionTest {
         TimeUnit.SECONDS.sleep(3);
         TimeUnit.SECONDS.sleep(3);
         logger.info("test end");
         logger.info("test end");
     }
     }
-}
+}

+ 10 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -32,17 +32,17 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
     private static final String STATEMENTS_PLACEHOLDER = "#";
     private static final String STATEMENTS_PLACEHOLDER = "#";
-    private static final String GET_DATABASE_NAME = "SELECT db_name()";
-    private static final String GET_TABLE_LIST = "SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('#') AND IS_MS_SHIPPED = 0";
-    private static final String IS_DB_CDC_ENABLED = "SELECT is_cdc_enabled FROM sys.databases WHERE name = '#'";
-    private static final String IS_TABLE_CDC_ENABLED = "SELECT COUNT(*) FROM sys.tables tb WHERE tb.is_tracked_by_cdc = 1 AND tb.name='#'";
-    private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name = '#' AND is_cdc_enabled=0) EXEC sys.sp_cdc_enable_db";
-    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'%s', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
+    private static final String GET_DATABASE_NAME = "select db_name()";
+    private static final String GET_TABLE_LIST = "select name from sys.tables where schema_id = schema_id('#') and is_ms_shipped = 0";
+    private static final String IS_DB_CDC_ENABLED = "select is_cdc_enabled from sys.databases where name = '#'";
+    private static final String IS_TABLE_CDC_ENABLED = "select count(*) from sys.tables tb where tb.is_tracked_by_cdc = 1 and tb.name='#'";
+    private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name = '#' and is_cdc_enabled=0) EXEC sys.sp_cdc_enable_db";
+    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' and is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'%s', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
     private static final String GET_TABLES_CDC_ENABLED = "EXEC sys.sp_cdc_help_change_data_capture";
     private static final String GET_TABLES_CDC_ENABLED = "EXEC sys.sp_cdc_help_change_data_capture";
-    private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
-    private static final String GET_MIN_LSN = "SELECT sys.fn_cdc_get_min_lsn('#')";
-    private static final String GET_INCREMENT_LSN = "SELECT sys.fn_cdc_increment_lsn(?)";
-    private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
+    private static final String GET_MAX_LSN = "select sys.fn_cdc_get_max_lsn()";
+    private static final String GET_MIN_LSN = "select sys.fn_cdc_get_min_lsn('#')";
+    private static final String GET_INCREMENT_LSN = "select sys.fn_cdc_increment_lsn(?)";
+    private static final String GET_ALL_CHANGES_FOR_TABLE = "select * from cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
 
 
     private static final String LSN_POSITION = "position";
     private static final String LSN_POSITION = "position";
     private static final int OFFSET_COLUMNS = 4;
     private static final int OFFSET_COLUMNS = 4;

+ 98 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java

@@ -0,0 +1,98 @@
+package org.dbsyncer.parser;
+
+import com.google.protobuf.ByteString;
+import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
+import org.dbsyncer.storage.binlog.proto.BinlogMap;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.binlog.proto.EventEnum;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<WriterRequest> {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private BufferActuator writerBufferActuator;
+
+    @Autowired
+    private CacheService cacheService;
+
+    protected void flush(String tableGroupId, String event, Map<String, Object> data) {
+        try {
+            BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
+            data.forEach((k, v) -> {
+                if (null != v) {
+                    ByteString bytes = BinlogMessageUtil.serializeValue(v);
+                    if (null != bytes) {
+                        dataBuilder.putRow(k, bytes);
+                    }
+                }
+            });
+
+            BinlogMessage builder = BinlogMessage.newBuilder()
+                    .setTableGroupId(tableGroupId)
+                    .setEvent(EventEnum.valueOf(event))
+                    .setData(dataBuilder.build())
+                    .build();
+            super.flush(builder);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Override
+    protected WriterRequest deserialize(String messageId, BinlogMessage message) {
+        if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
+            return null;
+        }
+
+        // 1、获取配置信息
+        final TableGroup tableGroup = cacheService.get(message.getTableGroupId(), TableGroup.class);
+
+        // 2、反序列数据
+        try {
+            final Picker picker = new Picker(tableGroup.getFieldMapping());
+            final Map<String, Field> fieldMap = picker.getTargetFieldMap();
+            Map<String, Object> data = new HashMap<>();
+            message.getData().getRowMap().forEach((k, v) -> {
+                if (fieldMap.containsKey(k)) {
+                    data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));
+                }
+            });
+            return new WriterRequest(messageId, message.getTableGroupId(), message.getEvent().name(), data);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+        return null;
+    }
+
+    @Override
+    public String getTaskName() {
+        return "WriterBinlog";
+    }
+
+    @Override
+    public Queue getQueue() {
+        return writerBufferActuator.getQueue();
+    }
+
+    @Override
+    public int getQueueCapacity() {
+        return writerBufferActuator.getQueueCapacity();
+    }
+
+}

+ 67 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/StringToTimestampHandler.java

@@ -0,0 +1,67 @@
+package org.dbsyncer.parser.convert.handler;
+
+import org.dbsyncer.common.column.Lexer;
+import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.parser.convert.AbstractHandler;
+
+/**
+ * 字符串转Timestamp
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/12 23:04
+ */
+public class StringToTimestampHandler extends AbstractHandler {
+
+    @Override
+    public Object convert(String args, Object value) {
+        if (value instanceof String) {
+            String s = (String) value;
+            // 2020-7-12 00:00:00
+            if(s.length() < 19){
+                s = format(s);
+            }
+            value = DateFormatUtil.stringToTimestamp(s);
+        }
+        return value;
+    }
+
+    private String format(String s){
+        StringBuilder buf = new StringBuilder();
+        Lexer lexer = new Lexer(s);
+        char comma = '-';
+        // 年
+        nextToken(lexer, buf, comma);
+        // 月
+        nextToken(lexer, buf, comma);
+        // 日
+        comma = ' ';
+        nextToken(lexer, buf, comma);
+        // 时
+        comma = ':';
+        nextToken(lexer, buf, comma);
+        // 分
+        nextToken(lexer, buf, comma);
+        // 秒
+        nextToken(lexer, buf, comma, false);
+        return buf.toString();
+    }
+
+    private void nextToken(Lexer lexer, StringBuilder buf, char comma) {
+        nextToken(lexer, buf, comma, true);
+    }
+
+    private void nextToken(Lexer lexer, StringBuilder buf, char comma, boolean appendComma) {
+        buf.append(fillZero(lexer.nextToken(comma)));
+        if(appendComma){
+            buf.append(comma);
+        }
+    }
+
+    private String fillZero(String s){
+        if(s.length() < 2){
+            return String.format("%02d", Integer.parseInt(s));
+        }
+        return s;
+    }
+}

+ 4 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ConvertEnum.java

@@ -42,6 +42,10 @@ public enum ConvertEnum {
      * Long转Timestamp
      * Long转Timestamp
      */
      */
     LONG_TO_TIMESTAMP("LONG_TO_TIMESTAMP", "Long转Timestamp", 0, new LongToTimestampHandler()),
     LONG_TO_TIMESTAMP("LONG_TO_TIMESTAMP", "Long转Timestamp", 0, new LongToTimestampHandler()),
+    /**
+     * String转Timestamp
+     */
+    STRING_TO_TIMESTAMP("STRING_TO_TIMESTAMP", "String转Timestamp", 0, new StringToTimestampHandler()),
     /**
     /**
      * Byte[]转String
      * Byte[]转String
      */
      */

+ 7 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser.flush;
 package org.dbsyncer.parser.flush;
 
 
+import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -33,11 +34,8 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @Autowired
     @Autowired
     private ScheduledTaskService scheduledTaskService;
     private ScheduledTaskService scheduledTaskService;
 
 
-    private static final int CAPACITY = 10_0000;
-
-    private static final int MAX_BATCH_COUNT = 2000;
-
-    private static final int PERIOD = 300;
+    @Autowired
+    private BufferActuatorConfig bufferActuatorConfig;
 
 
     private Queue<Request> buffer;
     private Queue<Request> buffer;
 
 
@@ -45,13 +43,12 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
 
     private volatile boolean running;
     private volatile boolean running;
 
 
-    private Class<Response> responseClazz;
+    private final Class<Response> responseClazz = (Class<Response>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
 
 
     @PostConstruct
     @PostConstruct
     private void init() {
     private void init() {
-        responseClazz = (Class<Response>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
         buffer = new LinkedBlockingQueue(getQueueCapacity());
         buffer = new LinkedBlockingQueue(getQueueCapacity());
-        scheduledTaskService.start(PERIOD, this);
+        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
     }
     }
 
 
     /**
     /**
@@ -84,7 +81,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
 
     @Override
     @Override
     public int getQueueCapacity() {
     public int getQueueCapacity() {
-        return CAPACITY;
+        return bufferActuatorConfig.getQueueCapacity();
     }
     }
 
 
     @Override
     @Override
@@ -120,7 +117,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
         if (!queue.isEmpty()) {
         if (!queue.isEmpty()) {
             AtomicLong batchCounter = new AtomicLong();
             AtomicLong batchCounter = new AtomicLong();
             final Map<String, BufferResponse> map = new LinkedHashMap<>();
             final Map<String, BufferResponse> map = new LinkedHashMap<>();
-            while (!queue.isEmpty() && batchCounter.get() < MAX_BATCH_COUNT) {
+            while (!queue.isEmpty() && batchCounter.get() < bufferActuatorConfig.getBatchCount()) {
                 Request poll = queue.poll();
                 Request poll = queue.poll();
                 String key = getPartitionKey(poll);
                 String key = getPartitionKey(poll);
                 if (!map.containsKey(key)) {
                 if (!map.containsKey(key)) {

+ 15 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.parser.flush.impl;
 package org.dbsyncer.parser.flush.impl;
 
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.ConnectorConfig;
@@ -9,6 +11,7 @@ import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.FlushStrategy;
+import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
@@ -30,10 +33,14 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Autowired
     @Autowired
     private FlushStrategy flushStrategy;
     private FlushStrategy flushStrategy;
 
 
+    @Autowired
+    private ParserStrategy parserStrategy;
+
     @Autowired
     @Autowired
     private CacheService cacheService;
     private CacheService cacheService;
 
 
-    private final static int BATCH_SIZE = 100;
+    @Autowired
+    private BufferActuatorConfig bufferActuatorConfig;
 
 
     @Override
     @Override
     protected String getPartitionKey(WriterRequest request) {
     protected String getPartitionKey(WriterRequest request) {
@@ -43,6 +50,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Override
     @Override
     protected void partition(WriterRequest request, WriterResponse response) {
     protected void partition(WriterRequest request, WriterResponse response) {
         response.getDataList().add(request.getRow());
         response.getDataList().add(request.getRow());
+        if(StringUtil.isNotBlank(request.getMessageId())){
+            response.getMessageIds().add(request.getMessageId());
+        }
         if (response.isMerged()) {
         if (response.isMerged()) {
             return;
             return;
         }
         }
@@ -62,10 +72,13 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         // 2、批量执行同步
         // 2、批量执行同步
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, response.getEvent(),
         Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, response.getEvent(),
-                picker.getTargetFields(), response.getDataList(), BATCH_SIZE));
+                picker.getTargetFields(), response.getDataList(), bufferActuatorConfig.getWriterBatchCount()));
 
 
         // 3、持久化同步结果
         // 3、持久化同步结果
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
+
+        // 4、消息处理完成
+        parserStrategy.complete(response.getMessageIds());
     }
     }
 
 
     /**
     /**

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -67,7 +67,7 @@ public class Picker {
         return targetFields;
         return targetFields;
     }
     }
 
 
-    public Map<String, Field> getSourceFieldMap() {
-        return sourceFields.stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+    public Map<String, Field> getTargetFieldMap() {
+        return targetFields.stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
     }
 }
 }

+ 11 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -11,14 +11,25 @@ import java.util.Map;
  */
  */
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 
 
+    private String messageId;
+
     private Map row;
     private Map row;
 
 
     public WriterRequest(String tableGroupId, String event, Map row) {
     public WriterRequest(String tableGroupId, String event, Map row) {
+        this(null, tableGroupId, event, row);
+    }
+
+    public WriterRequest(String messageId, String tableGroupId, String event, Map row) {
         setTableGroupId(tableGroupId);
         setTableGroupId(tableGroupId);
         setEvent(event);
         setEvent(event);
+        this.messageId = messageId;
         this.row = row;
         this.row = row;
     }
     }
 
 
+    public String getMessageId() {
+        return messageId;
+    }
+
     public Map getRow() {
     public Map getRow() {
         return row;
         return row;
     }
     }

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -14,6 +14,7 @@ import java.util.Map;
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 
 
     private List<Map> dataList = new LinkedList<>();
     private List<Map> dataList = new LinkedList<>();
+    private List<String> messageIds = new LinkedList<>();
 
 
     private boolean isMerged;
     private boolean isMerged;
 
 
@@ -26,8 +27,8 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return dataList;
         return dataList;
     }
     }
 
 
-    public void setDataList(List<Map> dataList) {
-        this.dataList = dataList;
+    public List<String> getMessageIds() {
+        return messageIds;
     }
     }
 
 
     public boolean isMerged() {
     public boolean isMerged() {

+ 14 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/ParserStrategy.java

@@ -1,9 +1,23 @@
 package org.dbsyncer.parser.strategy;
 package org.dbsyncer.parser.strategy;
 
 
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 public interface ParserStrategy {
 public interface ParserStrategy {
 
 
+    /**
+     * 同步消息
+     *
+     * @param tableGroupId
+     * @param event
+     * @param data
+     */
     void execute(String tableGroupId, String event, Map<String, Object> data);
     void execute(String tableGroupId, String event, Map<String, Object> data);
 
 
+    /**
+     * 完成同步后,执行回调删除消息
+     *
+     * @param messageIds
+     */
+    void complete(List<String> messageIds);
 }
 }

+ 6 - 87
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -1,102 +1,21 @@
 package org.dbsyncer.parser.strategy.impl;
 package org.dbsyncer.parser.strategy.impl;
 
 
-import com.google.protobuf.ByteString;
-import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.event.RowChangedEvent;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.constant.ConnectorConstant;
-import org.dbsyncer.connector.model.Field;
-import org.dbsyncer.parser.flush.BufferActuator;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.Picker;
-import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.parser.AbstractWriterBinlog;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
-import org.dbsyncer.parser.util.ConvertUtil;
-import org.dbsyncer.plugin.PluginFactory;
-import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
-import org.dbsyncer.storage.binlog.proto.BinlogMap;
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.binlog.proto.EventEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 
 
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.Queue;
 
 
-public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRecorder<WriterRequest> implements ParserStrategy {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Autowired
-    private BufferActuator writerBufferActuator;
-
-    @Autowired
-    private CacheService cacheService;
+public final class DisableWriterBufferActuatorStrategy extends AbstractWriterBinlog implements ParserStrategy {
 
 
     @Override
     @Override
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
-        try {
-            BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
-            data.forEach((k, v) -> {
-                if (null != v) {
-                    ByteString bytes = serializeValue(v);
-                    if (null != bytes) {
-                        dataBuilder.putRow(k, bytes);
-                    }
-                }
-            });
-
-            BinlogMessage builder = BinlogMessage.newBuilder()
-                    .setTableGroupId(tableGroupId)
-                    .setEvent(EventEnum.valueOf(event))
-                    .setData(dataBuilder.build())
-                    .build();
-            flush(builder);
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        }
-    }
-
-    @Override
-    public Queue getQueue() {
-        return writerBufferActuator.getQueue();
-    }
-
-    @Override
-    public int getQueueCapacity() {
-        return writerBufferActuator.getQueueCapacity();
-    }
-
-    @Override
-    protected String getTaskName() {
-        return "WriterBinlog";
+        super.flush(tableGroupId, event, data);
     }
     }
 
 
     @Override
     @Override
-    protected WriterRequest deserialize(BinlogMessage message) {
-        if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
-            return null;
-        }
-
-        // 1、获取配置信息
-        final String tableGroupId = message.getTableGroupId();
-        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
-
-        // 2、反序列数据
-        final Picker picker = new Picker(tableGroup.getFieldMapping());
-        final Map<String, Field> fieldMap = picker.getSourceFieldMap();
-        Map<String, Object> data = new HashMap<>();
-        message.getData().getRowMap().forEach((k, v) -> {
-            if (fieldMap.containsKey(k)) {
-                data.put(k, resolveValue(fieldMap.get(k).getType(), v));
-            }
-        });
-
-        return new WriterRequest(message.getTableGroupId(), message.getEvent().name(), data);
+    public void complete(List<String> messageIds) {
+        super.complete(messageIds);
     }
     }
 
 
 }
 }

+ 13 - 19
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java

@@ -1,50 +1,44 @@
 package org.dbsyncer.parser.strategy.impl;
 package org.dbsyncer.parser.strategy.impl;
 
 
+import org.dbsyncer.parser.AbstractWriterBinlog;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 
 @Component
 @Component
-@ConditionalOnProperty(value = "dbsyncer.parser.writer.buffer.actuator.enabled", havingValue = "true")
-public final class EnableWriterBufferActuatorStrategy implements ParserStrategy {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
+@ConditionalOnProperty(value = "dbsyncer.parser.flush.buffer.actuator.speed.enabled", havingValue = "true")
+public final class EnableWriterBufferActuatorStrategy extends AbstractWriterBinlog implements ParserStrategy {
 
 
     private static final double BUFFER_THRESHOLD = 0.8;
     private static final double BUFFER_THRESHOLD = 0.8;
 
 
     @Autowired
     @Autowired
     private BufferActuator writerBufferActuator;
     private BufferActuator writerBufferActuator;
 
 
-    private double limit;
+    private static double limit;
 
 
     @PostConstruct
     @PostConstruct
     private void init() {
     private void init() {
-        limit = Math.ceil(writerBufferActuator.getQueueCapacity() * BUFFER_THRESHOLD);
+        limit = Math.ceil(getQueueCapacity() * BUFFER_THRESHOLD);
     }
     }
 
 
     @Override
     @Override
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
+        if (getQueue().size() >= limit) {
+            super.flush(tableGroupId, event, data);
+        }
         writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
         writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
+    }
 
 
-        // 超过容量限制,限制生产速度
-        final int size = writerBufferActuator.getQueue().size();
-        if (size >= limit) {
-            try {
-                TimeUnit.SECONDS.sleep(30);
-                logger.warn("当前任务队列大小{}已达上限{},请稍等{}秒", size, limit, 30);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage());
-            }
-        }
+    @Override
+    public void complete(List<String> messageIds) {
+        super.complete(messageIds);
     }
     }
 
 
 }
 }

+ 95 - 242
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -1,24 +1,27 @@
 package org.dbsyncer.storage.binlog;
 package org.dbsyncer.storage.binlog;
 
 
-import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
+import org.dbsyncer.common.config.BinlogRecorderConfig;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.query.Option;
 import org.dbsyncer.storage.query.Option;
+import org.dbsyncer.storage.util.DocumentUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.DisposableBean;
@@ -27,13 +30,13 @@ import org.springframework.beans.factory.annotation.Autowired;
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.sql.Date;
-import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Timestamp;
-import java.sql.Types;
-import java.util.*;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -47,28 +50,20 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
+    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
+
     @Autowired
     @Autowired
     private ScheduledTaskService scheduledTaskService;
     private ScheduledTaskService scheduledTaskService;
 
 
     @Autowired
     @Autowired
     private SnowflakeIdWorker snowflakeIdWorker;
     private SnowflakeIdWorker snowflakeIdWorker;
 
 
-    private static final String BINLOG_ID = "id";
-
-    private static final String BINLOG_CONTENT = "c";
-
-    private static final int SUBMIT_COUNT = 1000;
-
-    private static final Queue<BinlogMessage> queue = new LinkedBlockingQueue(10000);
-
-    private static final ByteBuffer buffer = ByteBuffer.allocate(8);
-
-    private static final BinlogColumnValue value = new BinlogColumnValue();
+    @Autowired
+    private BinlogRecorderConfig binlogRecorderConfig;
 
 
-    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
-            .append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
+    private static Queue<BinlogMessage> queue;
 
 
-    private Shard shard;
+    private static Shard shard;
 
 
     private WriterTask writerTask = new WriterTask();
     private WriterTask writerTask = new WriterTask();
 
 
@@ -76,28 +71,19 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
 
     @PostConstruct
     @PostConstruct
     private void init() throws IOException {
     private void init() throws IOException {
-        // /data/data/WriterBinlog/
+        queue = new LinkedBlockingQueue(binlogRecorderConfig.getQueueCapacity());
         shard = new Shard(PATH + getTaskName());
         shard = new Shard(PATH + getTaskName());
-        scheduledTaskService.start(500, writerTask);
-        scheduledTaskService.start(2000, readerTask);
+        scheduledTaskService.start(binlogRecorderConfig.getWriterPeriodMillisecond(), writerTask);
+        scheduledTaskService.start(binlogRecorderConfig.getReaderPeriodMillisecond(), readerTask);
     }
     }
 
 
     /**
     /**
-     * 获取任务名称
-     *
-     * @return
-     */
-    protected String getTaskName() {
-        return getClass().getSimpleName();
-    }
-
-    /**
-     * 反序列化任务
+     * 反序列化消息
      *
      *
      * @param message
      * @param message
      * @return
      * @return
      */
      */
-    protected abstract Message deserialize(BinlogMessage message);
+    protected abstract Message deserialize(String messageId, BinlogMessage message);
 
 
     @Override
     @Override
     public void flush(BinlogMessage message) {
     public void flush(BinlogMessage message) {
@@ -109,206 +95,20 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         shard.close();
         shard.close();
     }
     }
 
 
-    private void doParse() throws IOException {
-        Option option = new Option(new MatchAllDocsQuery());
-        option.addIndexFieldResolverEnum(BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
-        Paging paging = shard.query(option, 1, SUBMIT_COUNT, null);
-        if (CollectionUtils.isEmpty(paging.getData())) {
-            return;
-        }
-
-        List<Map> list = (List<Map>) paging.getData();
-        int size = list.size();
-        Term[] terms = new Term[size];
-        for (int i = 0; i < size; i++) {
+    @Override
+    public void complete(List<String> messageIds) {
+        if (!CollectionUtils.isEmpty(messageIds)) {
             try {
             try {
-                BytesRef ref = (BytesRef) list.get(i).get(BINLOG_CONTENT);
-                Message message = deserialize(BinlogMessage.parseFrom(ref.bytes));
-                if (null != message) {
-                    getQueue().offer(message);
+                int size = messageIds.size();
+                Term[] terms = new Term[size];
+                for (int i = 0; i < size; i++) {
+                    terms[i] = new Term(BinlogConstant.BINLOG_ID, messageIds.get(i));
                 }
                 }
-                terms[i] = new Term(BINLOG_ID, (String) list.get(i).get(BINLOG_ID));
-            } catch (InvalidProtocolBufferException e) {
+                shard.deleteBatch(terms);
+            } catch (IOException e) {
                 logger.error(e.getMessage());
                 logger.error(e.getMessage());
             }
             }
         }
         }
-        shard.deleteBatch(terms);
-    }
-
-    /**
-     * Java语言提供了八种基本类型,六种数字类型(四个整数型,两个浮点型),一种字符类型,一种布尔型。
-     * <p>
-     * <ol>
-     * <li>整数:包括int,short,byte,long</li>
-     * <li>浮点型:float,double</li>
-     * <li>字符:char</li>
-     * <li>布尔:boolean</li>
-     * </ol>
-     *
-     * <pre>
-     * 类型     长度     大小      最小值     最大值
-     * byte     1Byte    8-bit     -128       +127
-     * short    2Byte    16-bit    -2^15      +2^15-1
-     * int      4Byte    32-bit    -2^31      +2^31-1
-     * long     8Byte    64-bit    -2^63      +2^63-1
-     * float    4Byte    32-bit    IEEE754    IEEE754
-     * double   8Byte    64-bit    IEEE754    IEEE754
-     * char     2Byte    16-bit    Unicode 0  Unicode 2^16-1
-     * boolean  8Byte    64-bit
-     * </pre>
-     *
-     * @param v
-     * @return
-     */
-    protected ByteString serializeValue(Object v) {
-        String type = v.getClass().getName();
-        switch (type) {
-            // 字节
-            case "[B":
-                return ByteString.copyFrom((byte[]) v);
-
-            // 字符串
-            case "java.lang.String":
-                return ByteString.copyFromUtf8((String) v);
-
-            // 时间
-            case "java.sql.Timestamp":
-                buffer.clear();
-                Timestamp timestamp = (Timestamp) v;
-                buffer.putLong(timestamp.getTime());
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
-            case "java.sql.Date":
-                buffer.clear();
-                Date date = (Date) v;
-                buffer.putLong(date.getTime());
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
-            case "java.sql.Time":
-                buffer.clear();
-                Time time = (Time) v;
-                buffer.putLong(time.getTime());
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
-
-            // 数字
-            case "java.lang.Integer":
-                buffer.clear();
-                buffer.putInt((Integer) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 4);
-            case "java.lang.Long":
-                buffer.clear();
-                buffer.putLong((Long) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
-            case "java.lang.Short":
-                buffer.clear();
-                buffer.putShort((Short) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 2);
-            case "java.lang.Float":
-                buffer.clear();
-                buffer.putFloat((Float) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 4);
-            case "java.lang.Double":
-                buffer.clear();
-                buffer.putDouble((Double) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
-            case "java.math.BigDecimal":
-                BigDecimal bigDecimal = (BigDecimal) v;
-                return ByteString.copyFromUtf8(bigDecimal.toString());
-            case "java.util.BitSet":
-                BitSet bitSet = (BitSet) v;
-                return ByteString.copyFrom(bitSet.toByteArray());
-
-            // 布尔(1为true;0为false)
-            case "java.lang.Boolean":
-                buffer.clear();
-                Boolean b = (Boolean) v;
-                buffer.putShort((short) (b ? 1 : 0));
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 2);
-
-            default:
-                logger.error("Unsupported serialize value type:{}", type);
-                return null;
-        }
-    }
-
-    /**
-     * Resolve value
-     *
-     * @param type
-     * @param v
-     * @return
-     */
-    protected Object resolveValue(int type, ByteString v) {
-        value.setValue(v);
-
-        if (value.isNull()) {
-            return null;
-        }
-
-        switch (type) {
-            // 字符串
-            case Types.VARCHAR:
-            case Types.LONGVARCHAR:
-            case Types.NVARCHAR:
-            case Types.NCHAR:
-            case Types.CHAR:
-                return value.asString();
-
-            // 时间
-            case Types.TIMESTAMP:
-                return value.asTimestamp();
-            case Types.TIME:
-                return value.asTime();
-            case Types.DATE:
-                return value.asDate();
-
-            // 数字
-            case Types.INTEGER:
-            case Types.TINYINT:
-            case Types.SMALLINT:
-                return value.asInteger();
-            case Types.BIGINT:
-                return value.asLong();
-            case Types.FLOAT:
-            case Types.REAL:
-                return value.asFloat();
-            case Types.DOUBLE:
-                return value.asDouble();
-            case Types.DECIMAL:
-            case Types.NUMERIC:
-                return value.asBigDecimal();
-
-            // 布尔
-            case Types.BOOLEAN:
-                return value.asBoolean();
-
-            // 字节
-            case Types.BIT:
-            case Types.BINARY:
-            case Types.VARBINARY:
-            case Types.LONGVARBINARY:
-                return value.asByteArray();
-
-            // TODO 待实现
-            case Types.NCLOB:
-            case Types.CLOB:
-            case Types.BLOB:
-                return null;
-
-            // 暂不支持
-            case Types.ROWID:
-                return null;
-
-            default:
-                return null;
-        }
     }
     }
 
 
     /**
     /**
@@ -324,14 +124,11 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
 
             List<Document> tasks = new ArrayList<>();
             List<Document> tasks = new ArrayList<>();
             int count = 0;
             int count = 0;
-            Document doc;
-            while (!queue.isEmpty() && count < SUBMIT_COUNT) {
+            long now = Instant.now().toEpochMilli();
+            while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
                 BinlogMessage message = queue.poll();
                 BinlogMessage message = queue.poll();
                 if (null != message) {
                 if (null != message) {
-                    doc = new Document();
-                    doc.add(new StringField(BINLOG_ID, String.valueOf(snowflakeIdWorker.nextId()), Field.Store.YES));
-                    doc.add(new StoredField(BINLOG_CONTENT, new BytesRef(message.toByteArray())));
-                    tasks.add(doc);
+                    tasks.add(DocumentUtil.convertBinlog2Doc(String.valueOf(snowflakeIdWorker.nextId()), BinlogConstant.READY, new BytesRef(message.toByteArray()), now));
                 }
                 }
                 count++;
                 count++;
             }
             }
@@ -357,7 +154,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
 
         @Override
         @Override
         public void run() {
         public void run() {
-            if (running || (SUBMIT_COUNT * 2) + getQueue().size() >= getQueueCapacity()) {
+            if (running || (binlogRecorderConfig.getBatchCount() * 2) + getQueue().size() >= getQueueCapacity()) {
                 return;
                 return;
             }
             }
 
 
@@ -378,6 +175,62 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
                 }
                 }
             }
             }
         }
         }
+
+        private void doParse() throws IOException {
+            //  查询[待处理] 或 [处理中 & 处理超时]
+            long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
+            BooleanQuery query = new BooleanQuery.Builder()
+                    .add(new BooleanQuery.Builder()
+                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
+                            .build(), BooleanClause.Occur.SHOULD)
+                    .add(new BooleanQuery.Builder()
+                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
+                            .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, maxProcessingSeconds), BooleanClause.Occur.MUST)
+                            .build(), BooleanClause.Occur.SHOULD)
+                    .build();
+            Option option = new Option(query);
+            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
+            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
+            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_TIME, IndexFieldResolverEnum.LONG);
+
+            // 优先处理最早记录
+            Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
+            Paging paging = shard.query(option, 1, binlogRecorderConfig.getBatchCount(), sort);
+            if (CollectionUtils.isEmpty(paging.getData())) {
+                return;
+            }
+
+            List<Map> list = (List<Map>) paging.getData();
+            final int size = list.size();
+            final List<Message> messages = new ArrayList<>(size);
+            final List<Document> updateDocs = new ArrayList<>(size);
+            final Term[] deleteIds = new Term[size];
+            for (int i = 0; i < size; i++) {
+                Map row = list.get(i);
+                String id = (String) row.get(BinlogConstant.BINLOG_ID);
+                Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
+                BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
+                if (BinlogConstant.PROCESSING == status) {
+                    logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
+                }
+                deleteIds[i] = new Term(BinlogConstant.BINLOG_ID, id);
+                String newId = String.valueOf(snowflakeIdWorker.nextId());
+                try {
+                    Message message = deserialize(newId, BinlogMessage.parseFrom(ref.bytes));
+                    if (null != message) {
+                        messages.add(message);
+                        updateDocs.add(DocumentUtil.convertBinlog2Doc(newId, BinlogConstant.PROCESSING, ref, Instant.now().toEpochMilli()));
+                    }
+                } catch (InvalidProtocolBufferException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+
+            // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
+            shard.insertBatch(updateDocs);
+            shard.deleteBatch(deleteIds);
+            getQueue().addAll(messages);
+        }
     }
     }
 
 
 }
 }

+ 1 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogColumnValue.java → dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogColumnValue.java

@@ -1,8 +1,7 @@
-package org.dbsyncer.storage.binlog.impl;
+package org.dbsyncer.storage.binlog;
 
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 import org.dbsyncer.common.column.AbstractColumnValue;
 import org.dbsyncer.common.column.AbstractColumnValue;
-import org.dbsyncer.common.util.DateFormatUtil;
 
 
 import java.math.BigDecimal;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;

+ 0 - 361
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java

@@ -1,361 +0,0 @@
-package org.dbsyncer.storage.binlog;
-
-import org.apache.commons.io.FileUtils;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.NumberUtil;
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.storage.binlog.impl.BinlogPipeline;
-import org.dbsyncer.storage.binlog.impl.BinlogReader;
-import org.dbsyncer.storage.binlog.impl.BinlogWriter;
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.model.BinlogConfig;
-import org.dbsyncer.storage.model.BinlogIndex;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * <p>组件介绍</p>
- * <ol>
- *     <li>BinlogPipeline(提供文件流读写)</li>
- *     <li>定时器(维护索引文件状态,回收文件流)</li>
- *     <li>索引</li>
- * </ol>
- * <p>定时器</p>
- * <ol>
- *     <li>生成新索引(超过限制大小|过期)</li>
- *     <li>关闭索引流(有锁 & 读写状态关闭 & 30s未用)</li>
- *     <li>删除旧索引(无锁 & 过期)</li>
- * </ol>
- *
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/29 1:28
- */
-public class BinlogContext implements ScheduledTaskJob, Closeable {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private static final long BINLOG_MAX_SIZE = 256 * 1024 * 1024;
-
-    private static final int BINLOG_EXPIRE_DAYS = 7;
-
-    private static final int BINLOG_ACTUATOR_CLOSE_DELAYED_SECONDS = 30;
-
-    private static final String LINE_SEPARATOR = System.lineSeparator();
-
-    private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
-
-    private static final String BINLOG = "binlog";
-
-    private static final String BINLOG_INDEX = BINLOG + ".index";
-
-    private static final String BINLOG_CONFIG = BINLOG + ".config";
-
-    private final List<BinlogIndex> indexList = new LinkedList<>();
-
-    private final String path;
-
-    private final File configFile;
-
-    private final File indexFile;
-
-    private final BinlogPipeline pipeline;
-
-    private final Lock readerLock = new ReentrantLock(true);
-
-    private final Lock lock = new ReentrantLock(true);
-
-    private volatile boolean running;
-
-    private BinlogConfig config;
-
-    public BinlogContext(String taskName) throws IOException {
-        path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
-                .append("data").append(File.separatorChar)
-                .append("binlog").append(File.separatorChar)
-                .append(taskName).append(File.separatorChar)
-                .toString();
-        File dir = new File(path);
-        if (!dir.exists()) {
-            FileUtils.forceMkdir(dir);
-        }
-
-        // binlog.index
-        indexFile = new File(path + BINLOG_INDEX);
-        // binlog.config
-        configFile = new File(path + BINLOG_CONFIG);
-        if (!configFile.exists()) {
-            // binlog.000001
-            config = initBinlogConfigAndIndex(createNewBinlogName(0));
-        }
-
-        // read index
-        Assert.isTrue(indexFile.exists(), String.format("The index file '%s' is not exist.", indexFile.getName()));
-        readIndexFromDisk();
-
-        // delete index file
-        deleteExpiredIndexFile();
-
-        // {"binlog":"binlog.000001","pos":0}
-        if (null == config) {
-            config = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, DEFAULT_CHARSET), BinlogConfig.class);
-        }
-
-        // no index
-        if (CollectionUtils.isEmpty(indexList)) {
-            // binlog.000002
-            config = initBinlogConfigAndIndex(createNewBinlogName(config.getFileName()));
-            readIndexFromDisk();
-        }
-
-        // 配置文件已失效,取最早的索引文件
-        BinlogIndex startBinlogIndex = getBinlogIndexByFileName(config.getFileName());
-        if (null == startBinlogIndex) {
-            logger.warn("The binlog file '{}' is expired.", config.getFileName());
-            startBinlogIndex = indexList.get(0);
-            config = new BinlogConfig().setFileName(startBinlogIndex.getFileName());
-            write(configFile, JsonUtil.objToJson(config), false);
-        }
-
-        final BinlogWriter binlogWriter = new BinlogWriter(path, indexList.get(indexList.size() - 1));
-        final BinlogReader binlogReader = new BinlogReader(path, startBinlogIndex, config.getPosition());
-        pipeline = new BinlogPipeline(binlogWriter, binlogReader);
-        logger.info("BinlogContext initialized with config:{}", JsonUtil.objToJson(config));
-    }
-
-    @Override
-    public void run() {
-        if (running) {
-            return;
-        }
-
-        final Lock binlogLock = lock;
-        boolean locked = false;
-        try {
-            locked = binlogLock.tryLock();
-            if (locked) {
-                running = true;
-                createNewBinlogIndex();
-                closeFreeBinlogIndex();
-                deleteOldBinlogIndex();
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        } finally {
-            if (locked) {
-                running = false;
-                binlogLock.unlock();
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        pipeline.close();
-    }
-
-    public void flush() throws IOException {
-        config.setFileName(pipeline.getReaderFileName());
-        config.setPosition(pipeline.getReaderOffset());
-        write(configFile, JsonUtil.objToJson(config), false);
-    }
-
-    public byte[] readLine() throws IOException {
-        byte[] line = pipeline.readLine();
-        if(null == line){
-            switchNextBinlogIndex();
-        }
-        return line;
-    }
-
-    public void write(BinlogMessage message) throws IOException {
-        pipeline.write(message);
-    }
-
-    public BinlogIndex getBinlogIndexByFileName(String fileName) {
-        BinlogIndex index = null;
-        for (BinlogIndex binlogIndex : indexList) {
-            if (StringUtil.equals(binlogIndex.getFileName(), fileName)) {
-                index = binlogIndex;
-                break;
-            }
-        }
-        return index;
-    }
-
-    private void switchNextBinlogIndex() {
-        // 有新索引文件
-        if(!isCreatedNewBinlogIndex()){
-            return;
-        }
-        boolean locked = false;
-        try {
-            locked = readerLock.tryLock();
-            if (locked) {
-                // 有新索引文件
-                if(isCreatedNewBinlogIndex()){
-                    String newBinlogName = createNewBinlogName(pipeline.getReaderFileName());
-                    BinlogIndex startBinlogIndex = getBinlogIndexByFileName(newBinlogName);
-                    final BinlogReader binlogReader = pipeline.getBinlogReader();
-                    config = new BinlogConfig().setFileName(startBinlogIndex.getFileName());
-                    write(configFile, JsonUtil.objToJson(config), false);
-                    pipeline.setBinlogReader(new BinlogReader(path, startBinlogIndex, config.getPosition()));
-                    binlogReader.stop();
-                    logger.info("Switch to new index file '{}' for binlog reader.", newBinlogName);
-                }
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        } finally {
-            if (locked) {
-                readerLock.unlock();
-            }
-        }
-    }
-
-    private boolean isCreatedNewBinlogIndex() {
-        return !StringUtil.equals(pipeline.getReaderFileName(), pipeline.getWriterFileName());
-    }
-
-    private void createNewBinlogIndex() throws IOException {
-        final String writerFileName = pipeline.getWriterFileName();
-        File file = new File(path + writerFileName);
-        // 超过限制大小|过期
-        if (file.length() > BINLOG_MAX_SIZE || isExpiredFile(file)) {
-            final BinlogWriter binlogWriter = pipeline.getBinlogWriter();
-            String newBinlogName = createNewBinlogName(writerFileName);
-            logger.info("The size of index file '{}' has reached {}MB, exceeding the limit of {}MB, switching to a new index file '{}' for index writer.", writerFileName, getMB(file.length()), getMB(BINLOG_MAX_SIZE), newBinlogName);
-            write(indexFile, newBinlogName + LINE_SEPARATOR, true);
-            File binlogIndexFile = new File(path + newBinlogName);
-            write(binlogIndexFile, "", false);
-            BinlogIndex newBinlogIndex = new BinlogIndex(newBinlogName, getFileCreateDateTime(binlogIndexFile));
-            indexList.add(newBinlogIndex);
-            pipeline.setBinlogWriter(new BinlogWriter(path, newBinlogIndex));
-            binlogWriter.stop();
-        }
-    }
-
-    private void closeFreeBinlogIndex() throws IOException {
-        Iterator<BinlogIndex> iterator = indexList.iterator();
-        while (iterator.hasNext()) {
-            BinlogIndex next = iterator.next();
-            // 有锁 & 读写状态关闭 & 30s未用
-            if (!next.isFreeLock() && !next.isRunning() && next.getUpdateTime().isBefore(LocalDateTime.now().minusSeconds(BINLOG_ACTUATOR_CLOSE_DELAYED_SECONDS))) {
-                next.removeAllLock();
-                logger.info("Close free index file '{}', the last update time is {}", next.getFileName(), next.getUpdateTime());
-            }
-        }
-    }
-
-    private void deleteOldBinlogIndex() throws IOException {
-        Iterator<BinlogIndex> iterator = indexList.iterator();
-        while (iterator.hasNext()) {
-            BinlogIndex next = iterator.next();
-            // 无锁 & 过期
-            if (next.isFreeLock()) {
-                File file = new File(path + next.getFileName());
-                if(isExpiredFile(file)){
-                    FileUtils.forceDelete(file);
-                    iterator.remove();
-                }
-            }
-        }
-    }
-
-    private long getMB(long size) {
-        return size / 1024 / 1024;
-    }
-
-    private void readIndexFromDisk() throws IOException {
-        indexList.clear();
-        List<String> indexNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
-        if (!CollectionUtils.isEmpty(indexNames)) {
-            for (String indexName : indexNames) {
-                File file = new File(path + indexName);
-                if(file.exists()){
-                    indexList.add(new BinlogIndex(indexName, getFileCreateDateTime(file)));
-                }
-            }
-        }
-    }
-
-    private BinlogConfig initBinlogConfigAndIndex(String binlogName) throws IOException {
-        BinlogConfig config = new BinlogConfig().setFileName(binlogName);
-        write(configFile, JsonUtil.objToJson(config), false);
-        write(indexFile, binlogName + LINE_SEPARATOR, false);
-        write(new File(path + binlogName), "", false);
-        return config;
-    }
-
-    private void deleteExpiredIndexFile() throws IOException {
-        if (CollectionUtils.isEmpty(indexList)) {
-            return;
-        }
-        boolean delete = false;
-        Iterator<BinlogIndex> iterator = indexList.iterator();
-        while (iterator.hasNext()) {
-            BinlogIndex next = iterator.next();
-            if (null == next) {
-                continue;
-            }
-            File file = new File(path + next.getFileName());
-            if (!file.exists()) {
-                logger.info("Delete invalid binlog file '{}'.", next.getFileName());
-                iterator.remove();
-                delete = true;
-                continue;
-            }
-            if (isExpiredFile(file)) {
-                FileUtils.forceDelete(file);
-                iterator.remove();
-                delete = true;
-                logger.info("Delete expired binlog file '{}'.", next.getFileName());
-            }
-        }
-
-        if (delete) {
-            StringBuilder indexBuilder = new StringBuilder();
-            indexList.forEach(i -> indexBuilder.append(i.getFileName()).append(LINE_SEPARATOR));
-            write(indexFile, indexBuilder.toString(), false);
-        }
-    }
-
-    private boolean isExpiredFile(File file) throws IOException {
-        final LocalDateTime createTime = getFileCreateDateTime(file);
-        return createTime.isBefore(LocalDateTime.now().minusDays(BINLOG_EXPIRE_DAYS));
-    }
-
-    private LocalDateTime getFileCreateDateTime(File file) throws IOException {
-        BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class);
-        return attr.creationTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
-    }
-
-    private String createNewBinlogName(int index) {
-        return String.format("%s.%06d", BINLOG, index % 999999 + 1);
-    }
-
-    private String createNewBinlogName(String binlogName) {
-        return createNewBinlogName(NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1)));
-    }
-
-    private void write(File file, String line, boolean append) throws IOException {
-        FileUtils.write(file, line, DEFAULT_CHARSET, append);
-    }
-}

+ 17 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogRecorder.java

@@ -3,6 +3,7 @@ package org.dbsyncer.storage.binlog;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.List;
 import java.util.Queue;
 import java.util.Queue;
 
 
 /**
 /**
@@ -12,6 +13,15 @@ import java.util.Queue;
  */
  */
 public interface BinlogRecorder {
 public interface BinlogRecorder {
 
 
+    /**
+     * 获取任务名称
+     *
+     * @return
+     */
+    default String getTaskName() {
+        return getClass().getSimpleName();
+    }
+
     /**
     /**
      * 将任务序列化刷入磁盘
      * 将任务序列化刷入磁盘
      *
      *
@@ -19,6 +29,13 @@ public interface BinlogRecorder {
      */
      */
     void flush(BinlogMessage message) throws IOException;
     void flush(BinlogMessage message) throws IOException;
 
 
+    /**
+     * 消息同步完成后,删除消息记录
+     *
+     * @param messageIds
+     */
+    void complete(List<String> messageIds);
+
     /**
     /**
      * 获取缓存队列
      * 获取缓存队列
      *
      *

+ 0 - 63
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogPipeline.java

@@ -1,63 +0,0 @@
-package org.dbsyncer.storage.binlog.impl;
-
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/19 23:36
- */
-public class BinlogPipeline implements Closeable {
-    private BinlogWriter binlogWriter;
-    private BinlogReader binlogReader;
-
-    public BinlogPipeline(BinlogWriter binlogWriter, BinlogReader binlogReader) {
-        this.binlogWriter = binlogWriter;
-        this.binlogReader = binlogReader;
-    }
-
-    public void write(BinlogMessage message) throws IOException {
-        binlogWriter.write(message);
-    }
-
-    public byte[] readLine() throws IOException {
-        return binlogReader.readLine();
-    }
-
-    public long getReaderOffset() {
-        return binlogReader.getOffset();
-    }
-
-    public String getReaderFileName() {
-        return binlogReader.getFileName();
-    }
-
-    public String getWriterFileName() {
-        return binlogWriter.getFileName();
-    }
-
-    @Override
-    public void close() {
-        binlogWriter.close();
-        binlogReader.close();
-    }
-
-    public BinlogWriter getBinlogWriter() {
-        return binlogWriter;
-    }
-
-    public void setBinlogWriter(BinlogWriter binlogWriter) {
-        this.binlogWriter = binlogWriter;
-    }
-
-    public BinlogReader getBinlogReader() {
-        return binlogReader;
-    }
-
-    public void setBinlogReader(BinlogReader binlogReader) {
-        this.binlogReader = binlogReader;
-    }
-}

+ 0 - 53
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogReader.java

@@ -1,53 +0,0 @@
-package org.dbsyncer.storage.binlog.impl;
-
-import com.google.protobuf.CodedInputStream;
-import org.apache.commons.io.IOUtils;
-import org.dbsyncer.common.file.BufferedRandomAccessFile;
-import org.dbsyncer.storage.binlog.AbstractBinlogActuator;
-import org.dbsyncer.storage.model.BinlogIndex;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/26 23:25
- */
-public class BinlogReader extends AbstractBinlogActuator {
-    private final RandomAccessFile raf;
-    private final byte[] h = new byte[4];
-    private byte[] b;
-    private long offset;
-    private CodedInputStream cis;
-
-    public BinlogReader(String path, BinlogIndex binlogIndex, long position) throws IOException {
-        initBinlogIndex(binlogIndex);
-        this.raf = new BufferedRandomAccessFile(new File(path + binlogIndex.getFileName()), "r");
-        raf.seek(position);
-    }
-
-    public byte[] readLine() throws IOException {
-        this.offset = raf.getFilePointer();
-        if (offset >= raf.length()) {
-            return null;
-        }
-        raf.read(h);
-        cis = CodedInputStream.newInstance(h);
-        b = new byte[cis.readFixed32()];
-        raf.read(b);
-        raf.seek(this.offset + (h.length + b.length));
-        refreshBinlogIndexUpdateTime();
-        return b;
-    }
-
-    public long getOffset() {
-        return offset;
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(raf);
-    }
-}

+ 0 - 45
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogWriter.java

@@ -1,45 +0,0 @@
-package org.dbsyncer.storage.binlog.impl;
-
-import com.google.protobuf.CodedOutputStream;
-import org.apache.commons.io.IOUtils;
-import org.dbsyncer.storage.binlog.AbstractBinlogActuator;
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.model.BinlogIndex;
-
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/26 23:28
- */
-public class BinlogWriter extends AbstractBinlogActuator {
-
-    private final OutputStream out;
-
-    public BinlogWriter(String path, BinlogIndex binlogIndex) throws FileNotFoundException {
-        initBinlogIndex(binlogIndex);
-        this.out = new FileOutputStream(path + binlogIndex.getFileName(), true);
-    }
-
-    public void write(BinlogMessage message) throws IOException {
-        if(null != message){
-            // 选择固定长度int32作为tag标志位,4bytes, 最多可容纳2^31-1字节(2047MB左右, 建议上限64~128M内最佳),
-            final int serialized = message.getSerializedSize();
-            final int bufferSize = CodedOutputStream.computeFixed32SizeNoTag(serialized) + serialized;
-            final CodedOutputStream codedOutput = CodedOutputStream.newInstance(out, bufferSize);
-            codedOutput.writeFixed32NoTag(serialized);
-            message.writeTo(codedOutput);
-            codedOutput.flush();
-            refreshBinlogIndexUpdateTime();
-        }
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(out);
-    }
-}

+ 24 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/BinlogConstant.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.storage.constant;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/13 22:14
+ */
+public class BinlogConstant {
+
+    /**
+     * 属性
+     */
+    public static final String BINLOG_ID = "id";
+    public static final String BINLOG_STATUS = "s";
+    public static final String BINLOG_CONTENT = "c";
+    public static final String BINLOG_TIME = "t";
+
+    /**
+     * 状态类型
+     */
+    public static final int READY = 0;
+    public static final int PROCESSING = 1;
+
+}

+ 4 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/IndexFieldResolverEnum.java

@@ -4,6 +4,10 @@ import org.dbsyncer.storage.lucene.IndexFieldResolver;
 
 
 public enum IndexFieldResolverEnum {
 public enum IndexFieldResolverEnum {
 
 
+    LONG((f) -> f.numericValue().longValue()),
+
+    INT((f) -> f.numericValue().intValue()),
+
     STRING((f) -> f.stringValue()),
     STRING((f) -> f.stringValue()),
 
 
     BINARY((f) -> f.binaryValue());
     BINARY((f) -> f.binaryValue());

+ 5 - 5
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -16,7 +16,7 @@ import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.query.Option;
 import org.dbsyncer.storage.query.Option;
 import org.dbsyncer.storage.query.Param;
 import org.dbsyncer.storage.query.Param;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.query.Query;
-import org.dbsyncer.storage.util.ParamsUtil;
+import org.dbsyncer.storage.util.DocumentUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -99,14 +99,14 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     @Override
     @Override
     public void insert(StorageEnum type, String collection, Map params) throws IOException {
     public void insert(StorageEnum type, String collection, Map params) throws IOException {
         createShardIfNotExist(collection);
         createShardIfNotExist(collection);
-        Document doc = ParamsUtil.convertConfig2Doc(params);
+        Document doc = DocumentUtil.convertConfig2Doc(params);
         map.get(collection).insert(doc);
         map.get(collection).insert(doc);
     }
     }
 
 
     @Override
     @Override
     public void update(StorageEnum type, String collection, Map params) throws IOException {
     public void update(StorageEnum type, String collection, Map params) throws IOException {
         createShardIfNotExist(collection);
         createShardIfNotExist(collection);
-        Document doc = ParamsUtil.convertConfig2Doc(params);
+        Document doc = DocumentUtil.convertConfig2Doc(params);
         IndexableField field = doc.getField(ConfigConstant.CONFIG_MODEL_ID);
         IndexableField field = doc.getField(ConfigConstant.CONFIG_MODEL_ID);
         map.get(collection).update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), doc);
         map.get(collection).update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), doc);
     }
     }
@@ -131,14 +131,14 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     @Override
     @Override
     public void insertLog(StorageEnum type, String collection, Map<String, Object> params) throws IOException {
     public void insertLog(StorageEnum type, String collection, Map<String, Object> params) throws IOException {
         createShardIfNotExist(collection);
         createShardIfNotExist(collection);
-        Document doc = ParamsUtil.convertLog2Doc(params);
+        Document doc = DocumentUtil.convertLog2Doc(params);
         map.get(collection).insert(doc);
         map.get(collection).insert(doc);
     }
     }
 
 
     @Override
     @Override
     public void insertData(StorageEnum type, String collection, List<Map> list) throws IOException {
     public void insertData(StorageEnum type, String collection, List<Map> list) throws IOException {
         createShardIfNotExist(collection);
         createShardIfNotExist(collection);
-        List<Document> docs = list.stream().map(r -> ParamsUtil.convertData2Doc(r)).collect(Collectors.toList());
+        List<Document> docs = list.stream().map(r -> DocumentUtil.convertData2Doc(r)).collect(Collectors.toList());
         map.get(collection).insertBatch(docs);
         map.get(collection).insertBatch(docs);
     }
     }
 
 

+ 239 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java

@@ -0,0 +1,239 @@
+package org.dbsyncer.storage.util;
+
+import com.google.protobuf.ByteString;
+import oracle.sql.BLOB;
+import oracle.sql.CLOB;
+import oracle.sql.TIMESTAMP;
+import org.apache.commons.io.IOUtils;
+import org.dbsyncer.storage.binlog.BinlogColumnValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.sql.*;
+import java.util.BitSet;
+
+/**
+ * Java语言提供了八种基本类型,六种数字类型(四个整数型,两个浮点型),一种字符类型,一种布尔型。
+ * <p>
+ * <ol>
+ * <li>整数:包括int,short,byte,long</li>
+ * <li>浮点型:float,double</li>
+ * <li>字符:char</li>
+ * <li>布尔:boolean</li>
+ * </ol>
+ *
+ * <pre>
+ * 类型     长度     大小      最小值     最大值
+ * byte     1Byte    8-bit     -128       +127
+ * short    2Byte    16-bit    -2^15      +2^15-1
+ * int      4Byte    32-bit    -2^31      +2^31-1
+ * long     8Byte    64-bit    -2^63      +2^63-1
+ * float    4Byte    32-bit    IEEE754    IEEE754
+ * double   8Byte    64-bit    IEEE754    IEEE754
+ * char     2Byte    16-bit    Unicode 0  Unicode 2^16-1
+ * boolean  8Byte    64-bit
+ * </pre>
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/14 22:07
+ */
+public abstract class BinlogMessageUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(BinlogMessageUtil.class);
+
+    private static final ByteBuffer buffer = ByteBuffer.allocate(8);
+
+    private static final BinlogColumnValue value = new BinlogColumnValue();
+
+    public static ByteString serializeValue(Object v) {
+        String type = v.getClass().getName();
+        switch (type) {
+            // 字节
+            case "[B":
+                return ByteString.copyFrom((byte[]) v);
+
+            // 字符串
+            case "java.lang.String":
+                return ByteString.copyFromUtf8((String) v);
+
+            // 时间
+            case "java.sql.Timestamp":
+                buffer.clear();
+                Timestamp timestamp = (Timestamp) v;
+                buffer.putLong(timestamp.getTime());
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "java.sql.Date":
+                buffer.clear();
+                Date date = (Date) v;
+                buffer.putLong(date.getTime());
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "java.sql.Time":
+                buffer.clear();
+                Time time = (Time) v;
+                buffer.putLong(time.getTime());
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+
+            // 数字
+            case "java.lang.Integer":
+                buffer.clear();
+                buffer.putInt((Integer) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 4);
+            case "java.lang.Long":
+                buffer.clear();
+                buffer.putLong((Long) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "java.lang.Short":
+                buffer.clear();
+                buffer.putShort((Short) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 2);
+            case "java.lang.Float":
+                buffer.clear();
+                buffer.putFloat((Float) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 4);
+            case "java.lang.Double":
+                buffer.clear();
+                buffer.putDouble((Double) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "java.math.BigDecimal":
+                BigDecimal bigDecimal = (BigDecimal) v;
+                return ByteString.copyFromUtf8(bigDecimal.toString());
+            case "java.util.BitSet":
+                BitSet bitSet = (BitSet) v;
+                return ByteString.copyFrom(bitSet.toByteArray());
+
+            // 布尔(1为true;0为false)
+            case "java.lang.Boolean":
+                buffer.clear();
+                Boolean b = (Boolean) v;
+                buffer.putShort((short) (b ? 1 : 0));
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 2);
+            case "oracle.sql.TIMESTAMP":
+                buffer.clear();
+                TIMESTAMP timeStamp = (TIMESTAMP) v;
+                try {
+                    buffer.putLong(timeStamp.timestampValue().getTime());
+                } catch (SQLException e) {
+                    logger.error(e.getMessage());
+                }
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "oracle.sql.BLOB":
+                return ByteString.copyFrom(getBytes((BLOB) v));
+            case "oracle.sql.CLOB":
+                return ByteString.copyFrom(getBytes((CLOB) v));
+            default:
+                logger.error("Unsupported serialize value type:{}", type);
+                return null;
+        }
+    }
+
+    public static Object deserializeValue(int type, ByteString v) {
+        value.setValue(v);
+
+        if (value.isNull()) {
+            return null;
+        }
+
+        switch (type) {
+            // 字符串
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.NVARCHAR:
+            case Types.NCHAR:
+            case Types.CHAR:
+                return value.asString();
+
+            // 时间
+            case Types.TIMESTAMP:
+                return value.asTimestamp();
+            case Types.TIME:
+                return value.asTime();
+            case Types.DATE:
+                return value.asDate();
+
+            // 数字
+            case Types.INTEGER:
+            case Types.TINYINT:
+                return value.asInteger();
+            case Types.SMALLINT:
+                return value.asShort();
+            case Types.BIGINT:
+                return value.asLong();
+            case Types.FLOAT:
+            case Types.REAL:
+                return value.asFloat();
+            case Types.DOUBLE:
+                return value.asDouble();
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                return value.asBigDecimal();
+
+            // 布尔
+            case Types.BOOLEAN:
+                return value.asBoolean();
+
+            // 字节
+            case Types.BIT:
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            // 二进制对象
+            case Types.NCLOB:
+            case Types.CLOB:
+            case Types.BLOB:
+                return value.asByteArray();
+
+            // 暂不支持
+            case Types.ROWID:
+                return null;
+
+            default:
+                return null;
+        }
+    }
+
+    private static byte[] getBytes(BLOB blob) {
+        InputStream is = null;
+        byte[] b = null;
+        try {
+            is = blob.getBinaryStream();
+            b = new byte[(int) blob.length()];
+            int read = is.read(b);
+            if(-1 == read){
+                return b;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+        return b;
+    }
+
+    private static byte[] getBytes(CLOB clob) {
+        try {
+            long length = clob.length();
+            if (length > 0) {
+                return clob.getSubString(1, (int) length).getBytes(Charset.defaultCharset());
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        }
+        return new byte[0];
+    }
+
+}

+ 21 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/ParamsUtil.java → dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.storage.util;
 package org.dbsyncer.storage.util;
 
 
 import org.apache.lucene.document.*;
 import org.apache.lucene.document.*;
+import org.apache.lucene.util.BytesRef;
+import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
@@ -40,8 +42,9 @@ import java.util.Map;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2019/11/19 22:07
  * @date 2019/11/19 22:07
  */
  */
-public abstract class ParamsUtil {
-    private ParamsUtil(){}
+public abstract class DocumentUtil {
+    private DocumentUtil() {
+    }
 
 
     public static Document convertConfig2Doc(Map params) {
     public static Document convertConfig2Doc(Map params) {
         Assert.notNull(params, "Params can not be null.");
         Assert.notNull(params, "Params can not be null.");
@@ -109,4 +112,20 @@ public abstract class ParamsUtil {
         return doc;
         return doc;
     }
     }
 
 
+    public static Document convertBinlog2Doc(String messageId, int status, BytesRef bytes, long updateTime) {
+        Document doc = new Document();
+        doc.add(new StringField(BinlogConstant.BINLOG_ID, messageId, Field.Store.YES));
+
+        doc.add(new IntPoint(BinlogConstant.BINLOG_STATUS, status));
+        doc.add(new StoredField(BinlogConstant.BINLOG_STATUS, status));
+
+        doc.add(new BinaryDocValuesField(BinlogConstant.BINLOG_CONTENT, bytes));
+        doc.add(new StoredField(BinlogConstant.BINLOG_CONTENT, bytes));
+
+        doc.add(new LongPoint(BinlogConstant.BINLOG_TIME, updateTime));
+        doc.add(new StoredField(BinlogConstant.BINLOG_TIME, updateTime));
+        doc.add(new NumericDocValuesField(BinlogConstant.BINLOG_TIME, updateTime));
+        return doc;
+    }
+
 }
 }

+ 0 - 224
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -1,224 +0,0 @@
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.io.IOUtils;
-import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
-import org.dbsyncer.storage.binlog.BinlogContext;
-import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
-import org.dbsyncer.storage.binlog.proto.BinlogMap;
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.binlog.proto.EventEnum;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.charset.Charset;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Queue;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/18 23:46
- */
-public class BinlogMessageTest extends AbstractBinlogRecorder {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private BinlogContext context;
-
-    private BinlogColumnValue value = new BinlogColumnValue();
-
-    @Before
-    public void init() throws IOException {
-        context = new BinlogContext("WriterBinlog");
-    }
-
-    @After
-    public void close() {
-        IOUtils.closeQuietly(context);
-    }
-
-    @Test
-    public void testBinlogMessage() throws IOException {
-        for (int i = 0; i < 10000; i++) {
-            write("123456", i+"");
-        }
-        //write("000111", "xyz");
-        //write("888999", "jkl");
-
-        byte[] line;
-        int count = 0;
-        while (null != (line = context.readLine())) {
-            //logger.info("size:{}, {}", line.length, line);
-            try {
-                BinlogMessage message = BinlogMessage.parseFrom(line);
-                if(null != message){
-                    count ++;
-                    message.getData();
-                }
-            } catch (InvalidProtocolBufferException e) {
-                logger.info("{} : {}", line.length, line);
-            }
-        }
-        logger.info("总条数:{}", count);
-        context.flush();
-    }
-
-    private void write(String tableGroupId, String key) throws IOException {
-        Map<String, Object> data = new HashMap<>();
-        data.put("id", 1L);
-        data.put("name", key + "中文");
-        data.put("age", 88);
-        data.put("bd", new BigDecimal(88));
-        data.put("sex", 1);
-        data.put("f", 88.88f);
-        data.put("d", 999.99d);
-        data.put("b", true);
-        short ss = 32767;
-        data.put("ss", ss);
-        data.put("bytes", "中文666".getBytes(Charset.defaultCharset()));
-        data.put("create_date", new Date(Timestamp.valueOf(LocalDateTime.now()).getTime()));
-        data.put("update_time", Timestamp.valueOf(LocalDateTime.now()).getTime());
-
-        BinlogMap.Builder builder = BinlogMap.newBuilder();
-        data.forEach((k, v) -> {
-            if (null != v) {
-                ByteString bytes = serializeValue(v);
-                if (null != bytes) {
-                    builder.putRow(k, bytes);
-                }
-            }
-        });
-
-        BinlogMessage build = BinlogMessage.newBuilder()
-                .setTableGroupId(tableGroupId)
-                .setEvent(EventEnum.UPDATE)
-                .setData(builder.build())
-                .build();
-        //byte[] bytes = build.toByteArray();
-        //logger.info("序列化长度:{}", bytes.length);
-        //logger.info("{}", bytes);
-        context.write(build);
-    }
-
-    @Test
-    public void testMessageNumber() {
-        // short
-        short s = 32767;
-        logger.info("short1:{}", s);
-        ByteString shortBytes = serializeValue(s);
-        logger.info("bytes:{}", shortBytes.toByteArray());
-        value.setValue(shortBytes);
-        short s2 = value.asShort();
-        logger.info("short2:{}", s2);
-
-        // int
-        int i = 1999999999;
-        logger.info("int1:{}", i);
-        ByteString intBytes = serializeValue(i);
-        logger.info("bytes:{}", intBytes.toByteArray());
-        value.setValue(intBytes);
-        int i2 = value.asInteger();
-        logger.info("int2:{}", i2);
-
-        // long
-        long l = 8999999999999999999L;
-        logger.info("long1:{}", l);
-        ByteString longBytes = serializeValue(l);
-        logger.info("bytes:{}", longBytes.toByteArray());
-        value.setValue(longBytes);
-        long l2 = value.asLong();
-        logger.info("long2:{}", l2);
-
-        // float
-        float f = 99999999999999999999999999999999999.99999999999999999999999999999999999f;
-        logger.info("float1:{}", f);
-        ByteString floatBytes = serializeValue(f);
-        logger.info("bytes:{}", floatBytes.toByteArray());
-        value.setValue(floatBytes);
-        float f2 = value.asFloat();
-        logger.info("float2:{}", f2);
-
-        // double
-        double d = 999999.9999999999999999999999999d;
-        logger.info("double1:{}", d);
-        ByteString doubleBytes = serializeValue(d);
-        logger.info("bytes:{}", doubleBytes.toByteArray());
-        value.setValue(doubleBytes);
-        double d2 = value.asDouble();
-        logger.info("double2:{}", d2);
-
-        // double
-        BigDecimal b = new BigDecimal(8888888.888888888888888f);
-        logger.info("bigDecimal1:{}", b);
-        ByteString bigDecimalBytes = serializeValue(b);
-        logger.info("bytes:{}", bigDecimalBytes.toByteArray());
-        value.setValue(bigDecimalBytes);
-        BigDecimal b2 = value.asBigDecimal();
-        logger.info("bigDecimal2:{}", b2);
-
-        // boolean
-        boolean bool = true;
-        logger.info("bool1:{}", bool);
-        ByteString boolBytes = serializeValue(bool);
-        logger.info("bytes:{}", boolBytes.toByteArray());
-        value.setValue(boolBytes);
-        Boolean bool2 = value.asBoolean();
-        logger.info("bool2:{}", bool2);
-    }
-
-    @Test
-    public void testMessageDate() {
-        // timestamp
-        Timestamp timestamp = Timestamp.valueOf(LocalDateTime.now());
-        logger.info("timestamp1:{}, l:{}", timestamp, timestamp.getTime());
-        ByteString timestampBytes = serializeValue(timestamp);
-        logger.info("bytes:{}", timestampBytes.toByteArray());
-        value.setValue(timestampBytes);
-        Timestamp timestamp2 = value.asTimestamp();
-        logger.info("timestamp2:{}, l:{}", timestamp2, timestamp2.getTime());
-
-        // date
-        Date date = new Date(timestamp.getTime());
-        logger.info("date1:{}, l:{}", date, date.getTime());
-        ByteString dateBytes = serializeValue(date);
-        logger.info("bytes:{}", dateBytes.toByteArray());
-        value.setValue(dateBytes);
-        Date date2 = value.asDate();
-        logger.info("date2:{}, l:{}", date2, date2.getTime());
-
-        // time
-        Time time = new Time(timestamp.getTime());
-        logger.info("time1:{}, l:{}", time, time.getTime());
-        ByteString timeBytes = serializeValue(time);
-        logger.info("bytes:{}", timeBytes.toByteArray());
-        value.setValue(timeBytes);
-        Time time2 = value.asTime();
-        logger.info("time2:{}, l:{}", time2, time2.getTime());
-    }
-
-    @Override
-    public Queue getQueue() {
-        return null;
-    }
-
-    @Override
-    public int getQueueCapacity() {
-        return 0;
-    }
-
-    @Override
-    protected Object deserialize(BinlogMessage message) {
-        return null;
-    }
-
-}

+ 2 - 2
dbsyncer-storage/src/main/test/LuceneFactoryTest.java

@@ -22,7 +22,7 @@ import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.query.Option;
 import org.dbsyncer.storage.query.Option;
-import org.dbsyncer.storage.util.ParamsUtil;
+import org.dbsyncer.storage.util.DocumentUtil;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -68,7 +68,7 @@ public class LuceneFactoryTest {
                     // 模拟操作
                     // 模拟操作
                     System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k));
                     System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k));
 
 
-                    Document data = ParamsUtil.convertData2Doc(createMap(k));
+                    Document data = DocumentUtil.convertData2Doc(createMap(k));
                     //IndexableField field = data.getField(ConfigConstant.CONFIG_MODEL_ID);
                     //IndexableField field = data.getField(ConfigConstant.CONFIG_MODEL_ID);
                     //shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), data);
                     //shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), data);
                     shard.insert(data);
                     shard.insert(data);

+ 92 - 42
dbsyncer-storage/src/main/test/ShardBinlogTest.java

@@ -1,20 +1,22 @@
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.*;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.model.Paging;
-import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
+import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.query.Option;
 import org.dbsyncer.storage.query.Option;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
+import org.dbsyncer.storage.util.DocumentUtil;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -26,15 +28,20 @@ import java.math.BigDecimal;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
 import java.sql.Date;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.sql.Timestamp;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/6/18 23:46
  * @date 2022/6/18 23:46
  */
  */
-public class ShardBinlogTest extends AbstractBinlogRecorder {
+public class ShardBinlogTest {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
@@ -51,38 +58,100 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
     }
     }
 
 
     @Test
     @Test
-    public void testBinlogMessage() throws IOException {
+    public void testBinlogMessage() throws IOException, InterruptedException {
+        mockData(1);
+
+        // 查询[待处理] 或 [处理中 且 处理超时]
+        List<Map> maps = queryReadyAndProcess();
+        logger.info("总条数:{}", maps.size());
+        TimeUnit.SECONDS.sleep(1);
+        markProcessing(maps);
+        logger.info("标记处理中");
+
+        // 模拟新记录
+        TimeUnit.SECONDS.sleep(1);
+        mockData(6);
+
+        maps = queryReadyAndProcess();
+        logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
+
+        logger.info("模拟处理超时,等待10s");
+        TimeUnit.SECONDS.sleep(10);
+
+        maps = queryReadyAndProcess();
+        logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
+    }
+
+    private void markProcessing(List<Map> maps) {
+        long updateTime = Instant.now().toEpochMilli();
+        maps.forEach(row -> {
+            String id = (String) row.get(BinlogConstant.BINLOG_ID);
+            BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
+            try {
+                shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), DocumentUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, updateTime));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    private void mockData(int i) throws IOException {
         List<Document> list = new ArrayList<>();
         List<Document> list = new ArrayList<>();
-        for (int i = 1; i <= 10000; i++) {
+        long now = Instant.now().toEpochMilli();
+        int size = i + 5;
+        while (i < size) {
             BinlogMessage message = genMessage("123456", i + "");
             BinlogMessage message = genMessage("123456", i + "");
-            Document doc = new Document();
-            doc.add(new StringField("id", String.valueOf(i), Field.Store.YES));
-            BytesRef bytesRef = new BytesRef(message.toByteArray());
-            doc.add(new StoredField("content", bytesRef));
-            list.add(doc);
+            BytesRef bytes = new BytesRef(message.toByteArray());
+            list.add(DocumentUtil.convertBinlog2Doc(String.valueOf(i), BinlogConstant.READY, bytes, now));
 
 
             if (i % 1000 == 0) {
             if (i % 1000 == 0) {
                 shard.insertBatch(list);
                 shard.insertBatch(list);
                 list.clear();
                 list.clear();
             }
             }
+            i++;
         }
         }
 
 
         if (!list.isEmpty()) {
         if (!list.isEmpty()) {
             shard.insertBatch(list);
             shard.insertBatch(list);
         }
         }
         check();
         check();
+    }
 
 
-        Option option = new Option(new MatchAllDocsQuery());
-        option.addIndexFieldResolverEnum("content", IndexFieldResolverEnum.BINARY);
-        Paging paging = shard.query(option, 1, 10001, null);
+    private List<Map> queryReadyAndProcess() throws IOException {
+        long lastTime = Timestamp.valueOf(LocalDateTime.now().minusSeconds(5)).getTime();
+        BooleanQuery filter1 = new BooleanQuery.Builder()
+                .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
+                .build();
+        BooleanQuery filter2 = new BooleanQuery.Builder()
+                .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
+                .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, lastTime), BooleanClause.Occur.MUST)
+                .build();
+        BooleanQuery query = new BooleanQuery.Builder()
+                .add(filter1, BooleanClause.Occur.SHOULD)
+                .add(filter2, BooleanClause.Occur.SHOULD)
+                .build();
+        return query(new Option(query));
+    }
+
+    private List<Map> query(Option option) throws IOException {
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_ID, IndexFieldResolverEnum.STRING);
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_TIME, IndexFieldResolverEnum.LONG);
+        Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
+        Paging paging = shard.query(option, 1, 10001, sort);
         List<Map> maps = (List<Map>) paging.getData();
         List<Map> maps = (List<Map>) paging.getData();
         for (Map m : maps) {
         for (Map m : maps) {
-            BytesRef ref = (BytesRef) m.get("content");
+            String id = (String) m.get(BinlogConstant.BINLOG_ID);
+            Integer s = (Integer) m.get(BinlogConstant.BINLOG_STATUS);
+            BytesRef ref = (BytesRef) m.get(BinlogConstant.BINLOG_CONTENT);
+            Long t = (Long) m.get(BinlogConstant.BINLOG_TIME);
             BinlogMessage message = BinlogMessage.parseFrom(ref.bytes);
             BinlogMessage message = BinlogMessage.parseFrom(ref.bytes);
             Map<String, ByteString> rowMap = message.getData().getRowMap();
             Map<String, ByteString> rowMap = message.getData().getRowMap();
-            logger.info(rowMap.get("name").toStringUtf8());
+            String timestamp = DateFormatUtil.timestampToString(new Timestamp(t));
+            logger.info("t:{}, id:{}, s:{}, message:{}", timestamp, id, s, rowMap.get("name").toStringUtf8());
         }
         }
-        logger.info("总条数:{}", paging.getTotal());
+        return maps;
     }
     }
 
 
     private BinlogMessage genMessage(String tableGroupId, String key) {
     private BinlogMessage genMessage(String tableGroupId, String key) {
@@ -104,36 +173,17 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
         BinlogMap.Builder builder = BinlogMap.newBuilder();
         BinlogMap.Builder builder = BinlogMap.newBuilder();
         data.forEach((k, v) -> {
         data.forEach((k, v) -> {
             if (null != v) {
             if (null != v) {
-                ByteString bytes = serializeValue(v);
+                ByteString bytes = BinlogMessageUtil.serializeValue(v);
                 if (null != bytes) {
                 if (null != bytes) {
                     builder.putRow(k, bytes);
                     builder.putRow(k, bytes);
                 }
                 }
             }
             }
         });
         });
 
 
-        BinlogMessage build = BinlogMessage.newBuilder()
-                .setTableGroupId(tableGroupId)
-                .setEvent(EventEnum.UPDATE)
-                .setData(builder.build())
-                .build();
+        BinlogMessage build = BinlogMessage.newBuilder().setTableGroupId(tableGroupId).setEvent(EventEnum.UPDATE).setData(builder.build()).build();
         return build;
         return build;
     }
     }
 
 
-    @Override
-    public Queue getQueue() {
-        return null;
-    }
-
-    @Override
-    public int getQueueCapacity() {
-        return 0;
-    }
-
-    @Override
-    protected Object deserialize(BinlogMessage message) {
-        return null;
-    }
-
     private void check() throws IOException {
     private void check() throws IOException {
         final IndexSearcher searcher = shard.getSearcher();
         final IndexSearcher searcher = shard.getSearcher();
         IndexReader reader = searcher.getIndexReader();
         IndexReader reader = searcher.getIndexReader();

+ 19 - 5
dbsyncer-web/src/main/resources/application.properties

@@ -6,15 +6,29 @@ dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
 server.servlet.session.timeout=1800
 server.servlet.session.timeout=1800
 server.servlet.context-path=/
 server.servlet.context-path=/
 #dbsyncer.common.worker.id=1
 #dbsyncer.common.worker.id=1
-#dbsyncer.web.thread.pool.coreSize=8
-#dbsyncer.web.thread.pool.maxSize=64
-#dbsyncer.web.thread.pool.queueCapacity=1000
+#dbsyncer.web.thread.pool.core-size=8
+#dbsyncer.web.thread.pool.max-size=64
+#dbsyncer.web.thread.pool.queue-capacity=1000
+
+#parser
+dbsyncer.parser.flush.buffer.actuator.speed.enabled=true
+#dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
+#dbsyncer.parser.flush.buffer.actuator.batch-count=1000
+#dbsyncer.parser.flush.buffer.actuator.queue-capacity=50000
+#dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
+#dbsyncer.parser.flush.full.enabled=true
+
+#storage
 #dbsyncer.storage.support.mysql.enabled=true
 #dbsyncer.storage.support.mysql.enabled=true
 #dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false
 #dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.password=123
 #dbsyncer.storage.support.mysql.config.password=123
-dbsyncer.parser.writer.buffer.actuator.enabled=true
-#dbsyncer.parser.flush.full.enabled=true
+#dbsyncer.storage.binlog.recorder.batch-count=1000
+#dbsyncer.storage.binlog.recorder.max-processing-seconds=120
+#dbsyncer.storage.binlog.recorder.queue-capacity=10000
+#dbsyncer.storage.binlog.recorder.writer-period-millisecond=500
+#dbsyncer.storage.binlog.recorder.reader-period-millisecond=2000
+
 #monitor
 #monitor
 management.endpoints.web.base-path=/app
 management.endpoints.web.base-path=/app
 management.endpoints.web.exposure.include=*
 management.endpoints.web.exposure.include=*