|
@@ -989,6 +989,8 @@ class Gui:
|
|
|
context = request.form.get("context", None)
|
|
|
upload_data = request.form.get("upload_data", None)
|
|
|
multiple = "multiple" in request.form and request.form["multiple"] == "True"
|
|
|
+
|
|
|
+ # File parsing and checks
|
|
|
file = request.files.get("blob", None)
|
|
|
if not file:
|
|
|
_warn("upload files: No file part")
|
|
@@ -998,58 +1000,72 @@ class Gui:
|
|
|
if file.filename == "":
|
|
|
_warn("upload files: No selected file")
|
|
|
return ("upload files: No selected file", 400)
|
|
|
+
|
|
|
+ # Path parsing and checks
|
|
|
+ path = request.form.get("path", "")
|
|
|
suffix = ""
|
|
|
complete = True
|
|
|
part = 0
|
|
|
+
|
|
|
if "total" in request.form:
|
|
|
total = int(request.form["total"])
|
|
|
if total > 1 and "part" in request.form:
|
|
|
part = int(request.form["part"])
|
|
|
suffix = f".part.{part}"
|
|
|
complete = part == total - 1
|
|
|
- if file: # and allowed_file(file.filename)
|
|
|
- upload_path = Path(self._get_config("upload_folder", tempfile.gettempdir())).resolve()
|
|
|
+
|
|
|
+ # Extract upload path (when single file is selected, path="" does not change the path)
|
|
|
+ upload_root = os.path.abspath( self._get_config( "upload_folder", tempfile.gettempdir() ) )
|
|
|
+ upload_path = os.path.abspath( os.path.join( upload_root, os.path.dirname(path) ) )
|
|
|
+ if upload_path.startswith( upload_root ):
|
|
|
+ upload_path = Path( upload_path ).resolve()
|
|
|
+ os.makedirs( upload_path, exist_ok=True )
|
|
|
+ # Save file into upload_path directory
|
|
|
file_path = _get_non_existent_file_path(upload_path, secure_filename(file.filename))
|
|
|
- file.save(str(upload_path / (file_path.name + suffix)))
|
|
|
- if complete:
|
|
|
- if part > 0:
|
|
|
- try:
|
|
|
- with open(file_path, "wb") as grouped_file:
|
|
|
- for nb in range(part + 1):
|
|
|
- part_file_path = upload_path / f"{file_path.name}.part.{nb}"
|
|
|
- with open(part_file_path, "rb") as part_file:
|
|
|
- grouped_file.write(part_file.read())
|
|
|
- # remove file_path after it is merged
|
|
|
- part_file_path.unlink()
|
|
|
- except EnvironmentError as ee: # pragma: no cover
|
|
|
- _warn(f"Cannot group file after chunk upload for {file.filename}", ee)
|
|
|
- return (f"Cannot group file after chunk upload for {file.filename}", 500)
|
|
|
- # notify the file is uploaded
|
|
|
- newvalue = str(file_path)
|
|
|
- if multiple and var_name:
|
|
|
- value = _getscopeattr(self, var_name)
|
|
|
- if not isinstance(value, t.List):
|
|
|
- value = [] if value is None else [value]
|
|
|
- value.append(newvalue)
|
|
|
- newvalue = value
|
|
|
- with self._set_locals_context(context):
|
|
|
- if on_upload_action:
|
|
|
- data = {}
|
|
|
- if upload_data:
|
|
|
- try:
|
|
|
- data = json.loads(upload_data)
|
|
|
- except Exception:
|
|
|
- pass
|
|
|
- data["path"] = file_path
|
|
|
- file_fn = self._get_user_function(on_upload_action)
|
|
|
- if not _is_function(file_fn):
|
|
|
- file_fn = _getscopeattr(self, on_upload_action)
|
|
|
- if _is_function(file_fn):
|
|
|
- self._call_function_with_state(
|
|
|
- t.cast(t.Callable, file_fn), ["file_upload", {"args": [data]}]
|
|
|
- )
|
|
|
- else:
|
|
|
- setattr(self._bindings(), var_name, newvalue)
|
|
|
+ file.save( os.path.join( upload_path, (file_path.name + suffix) ) )
|
|
|
+ else:
|
|
|
+ _warn(f"upload files: Path {path} points outside of upload root.")
|
|
|
+ return("upload files: Path part points outside of upload root.", 400)
|
|
|
+
|
|
|
+ if complete:
|
|
|
+ if part > 0:
|
|
|
+ try:
|
|
|
+ with open(file_path, "wb") as grouped_file:
|
|
|
+ for nb in range(part + 1):
|
|
|
+ part_file_path = upload_path / f"{file_path.name}.part.{nb}"
|
|
|
+ with open(part_file_path, "rb") as part_file:
|
|
|
+ grouped_file.write(part_file.read())
|
|
|
+ # remove file_path after it is merged
|
|
|
+ part_file_path.unlink()
|
|
|
+ except EnvironmentError as ee: # pragma: no cover
|
|
|
+ _warn(f"Cannot group file after chunk upload for {file.filename}", ee)
|
|
|
+ return (f"Cannot group file after chunk upload for {file.filename}", 500)
|
|
|
+ # notify the file is uploaded
|
|
|
+ newvalue = str(file_path)
|
|
|
+ if multiple and var_name:
|
|
|
+ value = _getscopeattr(self, var_name)
|
|
|
+ if not isinstance(value, t.List):
|
|
|
+ value = [] if value is None else [value]
|
|
|
+ value.append(newvalue)
|
|
|
+ newvalue = value
|
|
|
+ with self._set_locals_context(context):
|
|
|
+ if on_upload_action:
|
|
|
+ data = {}
|
|
|
+ if upload_data:
|
|
|
+ try:
|
|
|
+ data = json.loads(upload_data)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ data["path"] = file_path
|
|
|
+ file_fn = self._get_user_function(on_upload_action)
|
|
|
+ if not _is_function(file_fn):
|
|
|
+ file_fn = _getscopeattr(self, on_upload_action)
|
|
|
+ if _is_function(file_fn):
|
|
|
+ self._call_function_with_state(
|
|
|
+ t.cast(t.Callable, file_fn), ["file_upload", {"args": [data]}]
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ setattr(self._bindings(), var_name, newvalue)
|
|
|
return ("", 200)
|
|
|
|
|
|
def __send_var_list_update( # noqa C901
|