浏览代码

maint: send binary data to server when upload files

wangweimin 4 年之前
父节点
当前提交
417a990058

+ 1 - 8
pywebio/input.py

@@ -546,14 +546,7 @@ def file_upload(label='', accept=None, name=None, placeholder='Choose file', mul
     item_spec['max_size'] = parse_file_size(max_size)
     item_spec['max_total_size'] = parse_file_size(max_total_size)
 
-    def read_file(data):  # data: None or [{'filename':, 'dataurl', 'mime_type', 'last_modified'}, ...]
-        for d in data:
-            try:
-                _, encoded = d['dataurl'].split(",", 1)
-            except ValueError:
-                encoded = ''
-            d['content'] = b64decode(encoded)
-
+    def read_file(data):
         if not multiple:
             return data[0] if len(data) >= 1 else None
         return data

+ 5 - 4
pywebio/platform/aiohttp.py

@@ -9,7 +9,7 @@ from urllib.parse import urlparse
 from aiohttp import web
 
 from .tornado import open_webbrowser_on_server_started
-from .utils import make_applications, render_page, cdn_validation
+from .utils import make_applications, render_page, cdn_validation, deserialize_binary_event
 from ..session import CoroutineBasedSession, ThreadBasedSession, register_session_implement_for_target, Session
 from ..session.base import get_session_info_from_headers
 from ..utils import get_free_port, STATIC_PATH, iscoroutinefunction, isgeneratorfunction
@@ -100,12 +100,13 @@ def _webio_handler(applications, cdn, websocket_settings, check_origin_func=_is_
             async for msg in ws:
                 if msg.type == web.WSMsgType.text:
                     data = msg.json()
-                    if data is not None:
-                        session.send_client_event(data)
                 elif msg.type == web.WSMsgType.binary:
-                    pass
+                    data = deserialize_binary_event(msg.data)
                 elif msg.type == web.WSMsgType.close:
                     raise asyncio.CancelledError()
+
+                if data is not None:
+                    session.send_client_event(data)
         finally:
             if not close_from_session_tag:
                 # close session because client disconnected to server

+ 2 - 6
pywebio/platform/django.py

@@ -35,12 +35,8 @@ class DjangoHttpContext(HttpContext):
         """返回当前请求的URL参数"""
         return self.request.GET.get(name, default=default)
 
-    def request_json(self):
-        """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None"""
-        try:
-            return json.loads(self.request.body.decode('utf8'))
-        except Exception:
-            return None
+    def request_body(self):
+        return self.request.body
 
     def set_header(self, name, value):
         """为当前响应设置header"""

+ 11 - 5
pywebio/platform/fastapi.py

@@ -1,7 +1,7 @@
 import asyncio
 import logging
 from functools import partial
-
+import json
 import uvicorn
 from starlette.applications import Starlette
 from starlette.requests import Request
@@ -12,7 +12,7 @@ from starlette.websockets import WebSocket
 from starlette.websockets import WebSocketDisconnect
 
 from .tornado import open_webbrowser_on_server_started
-from .utils import make_applications, render_page, cdn_validation, OriginChecker
+from .utils import make_applications, render_page, cdn_validation, OriginChecker, deserialize_binary_event
 from ..session import CoroutineBasedSession, ThreadBasedSession, register_session_implement_for_target, Session
 from ..session.base import get_session_info_from_headers
 from ..utils import get_free_port, STATIC_PATH, iscoroutinefunction, isgeneratorfunction, strip_space
@@ -77,7 +77,13 @@ def _webio_routes(applications, cdn, check_origin_func):
 
         while True:
             try:
-                msg = await websocket.receive_json()
+                msg = await websocket.receive()
+                text, binary = msg.get('text'), msg.get('bytes')
+                event = None
+                if text:
+                    event = json.loads(text)
+                elif binary:
+                    event = deserialize_binary_event(binary)
             except WebSocketDisconnect:
                 if not close_from_session_tag:
                     # close session because client disconnected to server
@@ -85,8 +91,8 @@ def _webio_routes(applications, cdn, check_origin_func):
                     logger.debug("WebSocket closed from client")
                 break
 
-            if msg is not None:
-                session.send_client_event(msg)
+            if event is not None:
+                session.send_client_event(event)
 
     return [
         Route("/", http_endpoint),

+ 2 - 6
pywebio/platform/flask.py

@@ -38,12 +38,8 @@ class FlaskHttpContext(HttpContext):
         """返回当前请求的URL参数"""
         return request.args.get(name, default=default)
 
-    def request_json(self):
-        """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None"""
-        try:
-            return json.loads(self.request_data)
-        except Exception:
-            return None
+    def request_body(self):
+        return self.request_data
 
     def set_header(self, name, value):
         """为当前响应设置header"""

+ 17 - 2
pywebio/platform/httpbased.py

@@ -9,13 +9,14 @@
 """
 import asyncio
 import fnmatch
+import json
 import logging
 import threading
 import time
 from contextlib import contextmanager
 from typing import Dict
 
-from .utils import make_applications, render_page
+from .utils import make_applications, render_page, deserialize_binary_event
 from ..session import CoroutineBasedSession, Session, ThreadBasedSession, register_session_implement_for_target
 from ..session.base import get_session_info_from_headers
 from ..utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js
@@ -48,10 +49,23 @@ class HttpContext:
         Returns the value of the given URL parameter of the current request"""
         pass
 
+    def request_body(self):
+        """返回当前请求的body数据
+        Returns the data of the current request body
+
+        :return: bytes/bytearray
+        """
+        return b''
+
     def request_json(self) -> dict:
         """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None
         Return the data (json deserialization) of the currently requested, if the data is not in json format, return None"""
-        pass
+        try:
+            if self.request_headers().get('content-type') == 'application/octet-stream':
+                return deserialize_binary_event(self.request_body())
+            return json.loads(self.request_body())
+        except Exception:
+            return None
 
     def set_header(self, name, value):
         """为当前响应设置header
@@ -288,6 +302,7 @@ class HttpHandler:
         def get_app(context):
             app_name = context.request_url_parameter('app', 'index')
             return applications.get(app_name) or applications['index']
+
         self.app_loader = app_loader or get_app
 
         for target in (applications or {}).values():

+ 8 - 4
pywebio/platform/tornado.py

@@ -16,7 +16,7 @@ import tornado.ioloop
 from tornado.web import StaticFileHandler
 from tornado.websocket import WebSocketHandler
 
-from .utils import make_applications, render_page, cdn_validation
+from .utils import make_applications, render_page, cdn_validation, deserialize_binary_event
 from ..session import CoroutineBasedSession, ThreadBasedSession, ScriptModeSession, \
     register_session_implement_for_target, Session
 from ..session.base import get_session_info_from_headers
@@ -201,9 +201,13 @@ def _webio_handler(applications=None, cdn=True, reconnect_timeout=0, check_origi
             logger.debug('session id: %s' % self.session_id)
 
         def on_message(self, message):
-            data = json.loads(message)
-            if data is not None:
-                self.session.send_client_event(data)
+            if isinstance(message, bytes):
+                event = deserialize_binary_event(message)
+            else:
+                event = json.loads(message)
+            if event is None:
+                return
+            self.session.send_client_event(event)
 
         def on_close(self):
             cls = type(self)

+ 2 - 6
pywebio/platform/tornado_http.py

@@ -35,12 +35,8 @@ class TornadoHttpContext(HttpContext):
         """返回当前请求的URL参数"""
         return self.handler.get_query_argument(name, default=default)
 
-    def request_json(self):
-        """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None"""
-        try:
-            return json.loads(self.handler.request.body.decode('utf8'))
-        except Exception:
-            return None
+    def request_body(self):
+        return self.handler.request.body
 
     def set_header(self, name, value):
         """为当前响应设置header"""

+ 54 - 0
pywebio/platform/utils.py

@@ -6,6 +6,8 @@ from os import path
 import fnmatch
 from urllib.parse import urlparse
 from tornado import template
+import json
+from collections import defaultdict
 
 from ..__version__ import __version__ as version
 from ..exceptions import PyWebIOWarning
@@ -188,6 +190,58 @@ class OriginChecker:
         return origin == host
 
 
+def deserialize_binary_event(data: bytes):
+    """
+    Data format:
+    | event | file_header | file_data | file_header | file_data | ...
+
+    The 8 bytes at the beginning of each segment indicate the number of bytes remaining in the segment.
+
+    event: {
+        event: "from_submit",
+        task_id: that.task_id,
+        data: {
+            input_name => input_data
+        }
+    }
+
+    file_header: {
+        'filename': file name,
+        'size': file size,
+        'mime_type': file type,
+        'last_modified': last_modified timestamp,
+        'input_name': name of input field
+    }
+
+    Example:
+        b'\x00\x00\x00\x00\x00\x00\x00E{"event":"from_submit","task_id":"main-4788341456","data":{"data":1}}\x00\x00\x00\x00\x00\x00\x00Y{"filename":"hello.txt","size":2,"mime_type":"text/plain","last_modified":1617119937.276}\x00\x00\x00\x00\x00\x00\x00\x02ss'
+    """
+    parts = []
+    start_idx = 0
+    while start_idx < len(data):
+        size = int.from_bytes(data[start_idx:start_idx + 8], "big")
+        start_idx += 8
+        content = data[start_idx:start_idx + size]
+        parts.append(content)
+        start_idx += size
+
+    event = json.loads(parts[0])
+    files = defaultdict(list)
+    for idx in range(1, len(parts), 2):
+        f = json.loads(parts[idx])
+        f['content'] = parts[idx+1]
+        input_name = f.pop('input_name')
+        files[input_name].append(f)
+
+    for input_name in list(event['data'].keys()):
+        if input_name in files:
+            event['data'][input_name] = files[input_name]
+
+    return event
+
+
+
+
 def seo(title, description=None, app=None):
     """Set the SEO information of the PyWebIO application (web page information provided when indexed by search engines)
 

+ 22 - 11
webiojs/src/handlers/input.ts

@@ -1,5 +1,5 @@
 import {Command, Session} from "../session";
-import {error_alert, LRUMap, make_set} from "../utils";
+import {error_alert, LRUMap, make_set, serialize_json} from "../utils";
 import {InputItem} from "../models/input/base"
 import {state} from '../state'
 import {all_input_items} from "../models/input"
@@ -203,7 +203,7 @@ class FormController {
             body.append(item.create_element());
         }
 
-        // 事件绑定
+        // submit event
         element.on('submit', 'form', function (e) {
             e.preventDefault(); // avoid to execute the actual submit of the form.
 
@@ -213,14 +213,15 @@ class FormController {
                 if (!that.name2input[name].check_valid())
                     return error_alert(t("error_in_input"));
 
-            let data_keys:string[] = [];
-            let data_values:any[] = [];
+            let data_keys: string[] = [];
+            let data_values: any[] = [];
             $.each(that.name2input, (name, ctrl) => {
                 data_keys.push(name as string);
                 data_values.push(ctrl.get_value());
             });
 
-            let on_process  = (loaded: number, total: number)=>{};
+            let on_process = (loaded: number, total: number) => {
+            };
             // show process bar when there is a file input field
             for (let item of that.spec.inputs) {
                 if (item.type == 'file') {
@@ -229,15 +230,25 @@ class FormController {
                 }
             }
             Promise.all(data_values).then((values) => {
-                let data: { [i: string]: any } = {};
-                for (let idx in data_keys){
-                    data[data_keys[idx]] = values[idx];
+                let input_data: { [i: string]: any } = {};
+                let files: Blob[] = [];
+                for (let idx in data_keys) {
+                    input_data[data_keys[idx]] = values[idx];
+                    if (that.spec.inputs[idx].type == 'file') {
+                        input_data[data_keys[idx]] = [];
+                        files.push(...values[idx]);
+                    }
                 }
-                that.session.send_message({
+                let msg = {
                     event: "from_submit",
                     task_id: that.task_id,
-                    data: data
-                }, on_process);
+                    data: input_data
+                };
+                if (files.length) {
+                    that.session.send_buffer(new Blob([serialize_json(msg), ...files], {type: 'application/octet-stream'}), on_process);
+                } else {
+                    that.session.send_message(msg, on_process);
+                }
             });
 
         });

+ 5 - 18
webiojs/src/models/input/file.ts

@@ -1,6 +1,6 @@
 import {Session} from "../../session";
 import {InputItem} from "./base";
-import {deep_copy} from "../../utils"
+import {deep_copy, serialize_file} from "../../utils";
 import {t} from "../../i18n";
 
 const file_input_tpl = `
@@ -18,7 +18,7 @@ const file_input_tpl = `
 export class File extends InputItem {
     static accept_input_types: string[] = ["file"];
 
-    file_content_promises: Promise<{ filename: string, dataurl: string, mime_type: string, last_modified: number, size: number }>[] = []; // 待上传文件信息
+    files: Blob[] = []; // Files to be uploaded
     valid = true;
 
     constructor(session: Session, task_id: string, spec: any) {
@@ -49,12 +49,11 @@ export class File extends InputItem {
         // 文件选中后先不通知后端
         let that = this;
         input_elem.on('change', function () {
-            that.file_content_promises = [];
+            that.files = [];
             let total_size = 0;
             that.valid = true;
             let file = (input_elem[0] as HTMLInputElement).files;
             for (let f of file) {
-                let fr = new FileReader();
                 total_size += f.size;
 
                 if (that.spec.max_size && f.size > that.spec.max_size) {
@@ -74,20 +73,8 @@ export class File extends InputItem {
                 if (!that.valid) return;
                 that.update_input_helper(-1, {'valid_status': 0});
 
-                that.file_content_promises.push(new Promise((resolve, reject) => {
-                    fr.onload = function () {
-                        resolve({
-                            'filename': f.name,
-                            'size': f.size,
-                            'mime_type': f.type,
-                            'last_modified': f.lastModified / 1000,
-                            'dataurl': fr.result as string
-                        });
-                    };
-                }));
+                that.files.push(serialize_file(f, spec.name));
 
-
-                fr.readAsDataURL(f);
             }
 
         });
@@ -114,7 +101,7 @@ export class File extends InputItem {
     }
 
     get_value(): any {
-        return Promise.all(this.file_content_promises);
+        return this.files;
     }
 
     after_add_to_dom(): any {

+ 39 - 6
webiojs/src/session.ts

@@ -33,6 +33,8 @@ export interface Session {
 
     send_message(msg: ClientEvent, onprogress?: (loaded: number, total: number) => void): void;
 
+    send_buffer(data: Blob, onprogress?: (loaded: number, total: number) => void): void;
+
     close_session(): void;
 
     closed(): boolean;
@@ -133,6 +135,22 @@ export class WebSocketSession implements Session {
         if (this.debug) console.info('<<<', msg);
     }
 
+    send_buffer(data: Blob, onprogress?: (loaded: number, total: number) => void): void {
+        if (this.closed())
+            return error_alert(t("disconnected_with_server"));
+
+        if (this.ws === null)
+            return console.error('WebSocketWebIOSession.ws is null when invoke WebSocketWebIOSession.send_message. ' +
+                'Please call WebSocketWebIOSession.start_session first');
+
+        this.ws.send(data);
+
+        if (onprogress)
+            this.start_onprogress(onprogress);
+
+        if (this.debug) console.info('<<< Blob data...');
+    }
+
     close_session(): void {
         this._closed = true;
         this._on_session_close.call(this.ws, null);
@@ -217,15 +235,31 @@ export class HttpSession implements Session {
     };
 
     send_message(msg: ClientEvent, onprogress?: (loaded: number, total: number) => void): void {
+        if (this.debug) console.info('<<<', msg);
+        this._send({
+            data: JSON.stringify(msg),
+            contentType: "application/json; charset=utf-8",
+        }, onprogress);
+    }
+
+    send_buffer(data: Blob, onprogress?: (loaded: number, total: number) => void): void {
+        if (this.debug) console.info('<<< Blob data...');
+        this._send({
+            data: data,
+            cache: false,
+            processData: false,
+            contentType: 'application/octet-stream',
+        }, onprogress);
+    }
+
+    _send(options: { [key: string]: any; }, onprogress?: (loaded: number, total: number) => void): void {
         if (this.closed())
             return error_alert(t("disconnected_with_server"));
 
-        if (this.debug) console.info('<<<', msg);
         $.ajax({
+            ...options,
             type: "POST",
             url: this.api_url,
-            data: JSON.stringify(msg),
-            contentType: "application/json; charset=utf-8",
             dataType: "json",
             headers: {"webio-session-id": this.webio_session_id},
             success: this._on_request_success.bind(this),
@@ -239,12 +273,11 @@ export class HttpSession implements Session {
                 }, false);
                 return xhr;
             },
-            error: function () {  // todo
-                console.error('Http push event failed, event data: %s', msg);
+            error: function () {
+                console.error('Http push blob data failed');
                 error_alert(t("connect_fail"));
             }
         });
-
     }
 
     close_session(): void {

+ 29 - 0
webiojs/src/utils.ts

@@ -147,4 +147,33 @@ export function error_alert(text: string, duration: number = 1.5) {
         position: 'center',
         backgroundColor: '#e53935',
     }).showToast();
+}
+
+
+// make File object to Blob
+export function serialize_file(file: File, input_name: string) {
+    let header = {
+        'filename': file.name,
+        'size': file.size,
+        'mime_type': file.type,
+        'last_modified': file.lastModified / 1000,
+        'input_name': input_name
+    }
+    return new Blob([serialize_json(header), int2bytes(file.size), file], {type: 'application/octet-stream'});
+}
+
+// make json object to Blob
+export function serialize_json(json_obj: any) {
+    let json_str = JSON.stringify(json_obj);
+    const encoder = new TextEncoder();
+    const json_buf = encoder.encode(json_str).buffer;
+    return new Blob([int2bytes(json_buf.byteLength), json_buf], {type: 'application/octet-stream'});
+}
+
+function int2bytes(num: number) {
+    const buf = new ArrayBuffer(8);
+    const dataView = new DataView(buf);
+    dataView.setUint32(0, (num / 4294967296) | 0); // 4294967296 == 2^32
+    dataView.setUint32(4, num | 0);
+    return buf;
 }