12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- import hashlib
- import mimetypes
- from datetime import datetime
- from pathlib import Path
- from typing import Generator
- from fastapi import Request
- from fastapi.responses import Response, StreamingResponse
- mimetypes.init()
- def get_range_response(file: Path, request: Request, chunk_size: int) -> Response:
- """Get a Response for the given file, supporting range-requests, E-Tag and Last-Modified."""
- file_size = file.stat().st_size
- last_modified_time = datetime.utcfromtimestamp(file.stat().st_mtime)
- start = 0
- end = file_size - 1
- status_code = 200
- e_tag = hashlib.md5((str(last_modified_time) + str(file_size)).encode()).hexdigest()
- if_match_header = request.headers.get('If-None-Match')
- if if_match_header and if_match_header == e_tag:
- return Response(status_code=304) # Not Modified
- headers = {
- 'E-Tag': e_tag,
- 'Last-Modified': last_modified_time.strftime("%a, %d %b %Y %H:%M:%S GMT")
- }
- range_header = request.headers.get('range')
- media_type = mimetypes.guess_type(str(file))[0] or 'application/octet-stream'
- if range_header is not None:
- byte1, byte2 = range_header.split('=')[1].split('-')
- start = int(byte1)
- if byte2:
- end = int(byte2)
- status_code = 206 # Partial Content
- content_length = end - start + 1
- headers.update({
- 'Content-Length': str(content_length),
- 'Content-Range': f'bytes {start}-{end}/{file_size}',
- 'Accept-Ranges': 'bytes',
- })
- def content_reader(file: Path, start: int, end: int) -> Generator[bytes, None, None]:
- with open(file, 'rb') as data:
- data.seek(start)
- remaining_bytes = end - start + 1
- while remaining_bytes > 0:
- chunk = data.read(min(chunk_size, remaining_bytes))
- if not chunk:
- break
- yield chunk
- remaining_bytes -= len(chunk)
- return StreamingResponse(
- content_reader(file, start, end),
- media_type=media_type,
- headers=headers,
- status_code=status_code,
- )
|