Procházet zdrojové kódy

优化增量写入

AE86 před 3 roky
rodič
revize
72d57427f6

+ 6 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -71,19 +71,19 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     protected abstract void partition(Request request, Response response);
 
     /**
-     * 异步批处理
+     * 批处理
      *
      * @param response
      */
-    protected abstract void flush(Response response);
+    protected abstract void pull(Response response);
 
     @Override
-    public void offer(AbstractRequest task) {
+    public void offer(AbstractRequest request) {
         if (running) {
-            temp.offer((Request) task);
+            temp.offer((Request) request);
             return;
         }
-        buffer.offer((Request) task);
+        buffer.offer((Request) request);
     }
 
     @Override
@@ -117,7 +117,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
             map.forEach((key, flushTask) -> {
                 long now = Instant.now().toEpochMilli();
                 try {
-                    flush((Response) flushTask);
+                    pull((Response) flushTask);
                 } catch (Exception e) {
                     logger.error("[{}]-flush异常{}", key);
                 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -9,6 +9,6 @@ import org.dbsyncer.parser.flush.model.AbstractRequest;
  */
 public interface BufferActuator {
 
-    void offer(AbstractRequest task);
+    void offer(AbstractRequest request);
 
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushBufferActuator.java

@@ -42,7 +42,7 @@ public class FlushBufferActuator extends AbstractBufferActuator<FlushRequest, Fl
     }
 
     @Override
-    protected void flush(FlushResponse response) {
+    protected void pull(FlushResponse response) {
         storageService.addData(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -58,7 +58,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     }
 
     @Override
-    protected void flush(WriterResponse response) {
+    protected void pull(WriterResponse response) {
         Result result = parserFactory.writeBatch(response.getConnectorMapper(), response.getCommand(), response.getEvent(), response.getFields(), response.getDataList(), BATCH_SIZE);
         flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent(), response.getDataList());
     }