Browse Source

refine sliding window sender in #532

WeiminWangKolmostar 2 years ago
parent
commit
45eea483f2
1 changed files with 40 additions and 30 deletions
  1. 40 30
      webiojs/src/utils.ts

+ 40 - 30
webiojs/src/utils.ts

@@ -188,67 +188,76 @@ export function is_mobile() {
 // put send task to a queue and run it one by one
 export class ReliableSender {
     private seq = 0;
-    private queue: { enable_batch: boolean, param: any }[] = [];
-    private send_running = false
+    private queue: { enable_batch: boolean, task: any }[] = [];
     private _stop = false;
+    private ignore_interval_send = false;
+    private last_send_time = 0;
+    private interval_send_id = 0;
 
     constructor(
-        private readonly sender: (params: any[], seq: number) => Promise<void>,
+        private readonly sender: (tasks: any[], seq: number) => Promise<void>,
         private window_size: number = 8,
-        init_seq = 0, private timeout = 2000
+        init_seq = 0,
+        send_interval = 2000,
+        private min_send_interval = 1000,
     ) {
         this.sender = sender;
         this.window_size = window_size;
-        this.timeout = timeout;
         this.seq = init_seq;
         this.queue = [];
+        this.ignore_interval_send = false;
+        this.interval_send_id = setInterval(this.interval_send.bind(this), send_interval);
     }
 
     /*
-    * for continuous batch_send tasks in queue, they will be sent in one sender, the sending will retry when it finished or timeout.
-    * for non-batch task, each will be sent in a single sender, the sending will retry when it finished.
+    * for continuous batch_send tasks in queue, they will be sent in one sender,
+    * for non-batch task, each will be sent in a single sender,
+    * the sending will retry when there are unfinished task in queue.
     * */
-    add_send_task(param: any, allow_batch_send = true) {
+    add_send_task(task: any, allow_batch_send = true) {
         if (this._stop) return;
         this.queue.push({
             enable_batch: allow_batch_send,
-            param: param
+            task: task
         });
-        if (!this.send_running)
-            this.start_send();
+        this.do_send();
     }
 
-    private start_send() {
-        if (this._stop || this.queue.length === 0) {
-            this.send_running = false;
-            return;
-        }
-        this.send_running = true;
-        let params: any[] = [];
+    private get_tasks() {
+        let tasks: any[] = [];
         for (let item of this.queue) {
             if (!item.enable_batch)
                 break;
-            params.push(item.param);
+            tasks.push(item.task);
         }
         let batch_send = true;
-        if (params.length === 0 && !this.queue[0].enable_batch) {
+        if (tasks.length === 0 && this.queue.length > 0 && !this.queue[0].enable_batch) {
             batch_send = false;
-            params.push(this.queue[0].param);
+            tasks.push(this.queue[0].task);
         }
-        if (params.length === 0) {
-            this.send_running = false;
+        return {tasks, batch_send};
+    }
+
+    private do_send() {
+        const info = this.get_tasks();
+        const tasks = info.tasks, batch_send = info.batch_send;
+        if (tasks.length === 0) {
             return;
         }
-
-        let promises = [this.sender(params, this.seq)];
-        if (batch_send)
-            promises.push(new Promise((resolve) => setTimeout(resolve, this.timeout)));
-
-        Promise.race(promises).then(() => {
-            this.start_send();
+        this.last_send_time = Date.now();
+        if (!batch_send)  // for non-batch task, only retry after current request finished
+            this.ignore_interval_send = true;
+        this.sender(tasks, this.seq).then(() => {
+            this.ignore_interval_send = false;
         });
     }
 
+    private interval_send() {
+        if (this._stop || this.ignore_interval_send) return;
+        if (Date.now() - this.last_send_time < this.min_send_interval) return;
+        this.do_send();
+    }
+
     // seq for each ack call must be larger than the previous one, otherwise the ack will be ignored
     ack(seq: number) {
         if (seq < this.seq)
@@ -260,5 +269,6 @@ export class ReliableSender {
 
     stop() {
         this._stop = true;
+        clearInterval(this.interval_send_id);
     }
 }