utils.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. import fnmatch
  2. import json
  3. import socket
  4. import urllib.parse
  5. from collections import defaultdict
  6. from collections import namedtuple
  7. from collections.abc import Mapping, Sequence
  8. from functools import lru_cache
  9. from functools import partial
  10. from os import path, environ
  11. from tornado import template
  12. from ..__version__ import __version__ as version
  13. from ..exceptions import PyWebIOWarning
  14. from ..utils import isgeneratorfunction, iscoroutinefunction, get_function_name, get_function_doc, \
  15. get_function_attr, STATIC_PATH
  16. """
  17. The maximum size in bytes of a http request body or a websocket message, after which the request or websocket is aborted
  18. Set by `start_server()` or `path_deploy()`
  19. Used in `file_upload()` as the `max_size`/`max_total_size` parameter default or to validate the parameter.
  20. """
  21. MAX_PAYLOAD_SIZE = 0
  22. DEFAULT_CDN = "https://cdn.jsdelivr.net/gh/wang0618/PyWebIO-assets@v{version}/"
  23. _global_config = {'title': 'PyWebIO Application'}
  24. config_keys = ['title', 'description', 'js_file', 'js_code', 'css_style', 'css_file', 'theme']
  25. AppMeta = namedtuple('App', config_keys)
  26. _here_dir = path.dirname(path.abspath(__file__))
  27. _index_page_tpl = template.Template(open(path.join(_here_dir, 'tpl', 'index.html'), encoding='utf8').read())
  28. def render_page(app, protocol, cdn):
  29. """渲染前端页面的HTML框架, 支持SEO
  30. :param callable app: PyWebIO app
  31. :param str protocol: 'ws'/'http'
  32. :param bool/str cdn: Whether to use CDN, also accept string as custom CDN URL
  33. :return: bytes content of rendered page
  34. """
  35. assert protocol in ('ws', 'http')
  36. meta = parse_app_metadata(app)
  37. if cdn is True:
  38. base_url = DEFAULT_CDN.format(version=version)
  39. elif not cdn:
  40. base_url = ''
  41. else: # user custom cdn
  42. base_url = cdn.rstrip('/') + '/'
  43. theme = environ.get('PYWEBIO_THEME', meta.theme) or 'default'
  44. check_theme(theme)
  45. return _index_page_tpl.generate(title=meta.title, description=meta.description, protocol=protocol,
  46. script=True, content='', base_url=base_url,
  47. js_file=meta.js_file or [], js_code=meta.js_code, css_style=meta.css_style,
  48. css_file=meta.css_file or [], theme=theme)
  49. @lru_cache(maxsize=64)
  50. def check_theme(theme):
  51. """check theme file existence"""
  52. if not theme:
  53. return
  54. theme_file = path.join(STATIC_PATH, 'css', 'bs-theme', theme + '.min.css')
  55. if not path.isfile(theme_file):
  56. raise RuntimeError("Can't find css file for theme `%s`" % theme)
  57. def cdn_validation(cdn, level='warn', stacklevel=3):
  58. """CDN availability check
  59. :param bool/str cdn: cdn parameter
  60. :param level: warn or error
  61. :param stacklevel: stacklevel=3 to makes the warning refer to cdn_validation() caller’s caller
  62. """
  63. assert level in ('warn', 'error')
  64. if cdn is True and 'dev' in version:
  65. if level == 'warn':
  66. import warnings
  67. warnings.warn("Default CDN is not supported in dev version. Ignore the CDN setting", PyWebIOWarning,
  68. stacklevel=stacklevel)
  69. return False
  70. else:
  71. raise ValueError("Default CDN is not supported in dev version. Please host static files by yourself.")
  72. return cdn
  73. def parse_app_metadata(func):
  74. """Get metadata form pywebio task function, fallback to global config in empty meta field."""
  75. prefix = '_pywebio_'
  76. attrs = get_function_attr(func, [prefix + k for k in config_keys])
  77. meta = AppMeta(**{k: attrs.get(prefix + k) for k in config_keys})
  78. doc = get_function_doc(func)
  79. parts = doc.strip().split('\n\n', 1)
  80. if len(parts) == 2:
  81. title, description = parts
  82. else:
  83. title, description = parts[0], ''
  84. if not meta.title:
  85. meta = meta._replace(title=title, description=description)
  86. # fallback to global config
  87. for key in config_keys:
  88. if not getattr(meta, key, None) and _global_config.get(key):
  89. kwarg = {key: _global_config.get(key)}
  90. meta = meta._replace(**kwarg)
  91. return meta
  92. _app_list_tpl = template.Template("""
  93. <h1>Applications index</h1>
  94. <ul>
  95. {% for name,meta in apps_info.items() %}
  96. <li>
  97. {% if other_arguments is not None %}
  98. <a href="?app={{name}}{{other_arguments}}">{{ meta.title or name }}</a>:
  99. {% else %}
  100. <a href="javascript:WebIO.openApp('{{ name }}', true)">{{ meta.title or name }}</a>:
  101. {% end %}
  102. {% if meta.description %}
  103. {{ meta.description }}
  104. {% else %}
  105. <i>No description.</i>
  106. {% end %}
  107. </li>
  108. {% end %}
  109. </ul>
  110. """.strip())
  111. def get_static_index_content(apps, query_arguments=None):
  112. """生成默认的静态主页
  113. :param callable apps: PyWebIO apps
  114. :param str query_arguments: Url Query Arguments。为None时,表示使用WebIO.openApp跳转
  115. :return: bytes
  116. """
  117. apps_info = {
  118. name: parse_app_metadata(func)
  119. for name, func in apps.items()
  120. }
  121. qs = urllib.parse.parse_qs(query_arguments)
  122. qs.pop('app', None)
  123. other_arguments = urllib.parse.urlencode(qs, doseq=True)
  124. if other_arguments:
  125. other_arguments = '&' + other_arguments
  126. else:
  127. other_arguments = None
  128. content = _app_list_tpl.generate(apps_info=apps_info, other_arguments=other_arguments).decode('utf8')
  129. return content
  130. def _generate_default_index_app(apps):
  131. """默认的主页任务函数"""
  132. content = get_static_index_content(apps)
  133. def index():
  134. from pywebio.output import put_html
  135. put_html(content)
  136. return index
  137. def make_applications(applications):
  138. """格式化 applications 为 任务名->任务函数 的映射, 并提供默认主页
  139. :param applications: 接受 单一任务函数、字典、列表 类型
  140. :return dict: 任务名->任务函数 的映射
  141. """
  142. if isinstance(applications, Sequence): # 列表 类型
  143. applications, app_list = {}, applications
  144. for func in app_list:
  145. name = get_function_name(func)
  146. if name in applications:
  147. raise ValueError("Duplicated application name:%r" % name)
  148. applications[name] = func
  149. elif not isinstance(applications, Mapping): # 单一任务函数 类型
  150. applications = {'index': applications}
  151. # convert dict key to str
  152. applications = {str(k): v for k, v in applications.items()}
  153. for app in applications.values():
  154. assert iscoroutinefunction(app) or isgeneratorfunction(app) or callable(app), \
  155. "Don't support application type:%s" % type(app)
  156. if 'index' not in applications:
  157. applications['index'] = _generate_default_index_app(applications)
  158. return applications
  159. class OriginChecker:
  160. @classmethod
  161. def check_origin(cls, origin, allowed_origins, host):
  162. if cls.is_same_site(origin, host):
  163. return True
  164. return any(
  165. fnmatch.fnmatch(origin, pattern)
  166. for pattern in allowed_origins
  167. )
  168. @staticmethod
  169. def is_same_site(origin, host):
  170. """判断 origin 和 host 是否一致。origin 和 host 都为http协议请求头"""
  171. parsed_origin = urllib.parse.urlparse(origin)
  172. origin = parsed_origin.netloc
  173. origin = origin.lower()
  174. # Check to see that origin matches host directly, including ports
  175. return origin == host
  176. def deserialize_binary_event(data: bytes):
  177. """
  178. Data format:
  179. | event | file_header | file_data | file_header | file_data | ...
  180. The 8 bytes at the beginning of each segment indicate the number of bytes remaining in the segment.
  181. event: {
  182. event: "from_submit",
  183. task_id: that.task_id,
  184. data: {
  185. input_name => input_data
  186. }
  187. }
  188. file_header: {
  189. 'filename': file name,
  190. 'size': file size,
  191. 'mime_type': file type,
  192. 'last_modified': last_modified timestamp,
  193. 'input_name': name of input field
  194. }
  195. Example:
  196. b'\x00\x00\x00\x00\x00\x00\x00E{"event":"from_submit","task_id":"main-4788341456","data":{"data":1}}\x00\x00\x00\x00\x00\x00\x00Y{"filename":"hello.txt","size":2,"mime_type":"text/plain","last_modified":1617119937.276}\x00\x00\x00\x00\x00\x00\x00\x02ss'
  197. """
  198. parts = []
  199. start_idx = 0
  200. while start_idx < len(data):
  201. size = int.from_bytes(data[start_idx:start_idx + 8], "big")
  202. start_idx += 8
  203. content = data[start_idx:start_idx + size]
  204. parts.append(content)
  205. start_idx += size
  206. event = json.loads(parts[0])
  207. files = defaultdict(list)
  208. for idx in range(1, len(parts), 2):
  209. f = json.loads(parts[idx])
  210. f['content'] = parts[idx + 1]
  211. input_name = f.pop('input_name')
  212. files[input_name].append(f)
  213. for input_name in list(event['data'].keys()):
  214. if input_name in files:
  215. event['data'][input_name] = files[input_name]
  216. return event
  217. def get_interface_ip(family: socket.AddressFamily) -> str:
  218. """Get the IP address of an external interface. Used when binding to
  219. 0.0.0.0 or :: to show a more useful URL.
  220. Copy from https://github.com/pallets/werkzeug/blob/df7492ab66aaced5eea964a58309caaadb1e8903/src/werkzeug/serving.py
  221. Under BSD-3-Clause License
  222. """
  223. # arbitrary private address
  224. host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
  225. with socket.socket(family, socket.SOCK_DGRAM) as s:
  226. try:
  227. s.connect((host, 58162))
  228. except OSError:
  229. return "::1" if family == socket.AF_INET6 else "127.0.0.1"
  230. return s.getsockname()[0] # type: ignore
  231. def print_listen_address(host, port):
  232. if not host:
  233. host = '0.0.0.0'
  234. all_address = False
  235. if host == "0.0.0.0":
  236. all_address = True
  237. host = get_interface_ip(socket.AF_INET)
  238. elif host == "::":
  239. all_address = True
  240. host = get_interface_ip(socket.AF_INET6)
  241. if ':' in host: # ipv6
  242. host = '[%s]' % host
  243. if all_address:
  244. print('Running on all addresses.')
  245. print('Use http://%s:%s/ to access the application' % (host, port))
  246. else:
  247. print('Running on http://%s:%s/' % (host, port))
  248. def seo(title, description=None, app=None):
  249. """Set the SEO information of the PyWebIO application (web page information provided when indexed by search engines)
  250. :param str title: Application title
  251. :param str description: Application description
  252. :param callable app: PyWebIO task function
  253. If ``seo()`` is not used, the `docstring <https://www.python.org/dev/peps/pep-0257/>`_ of the task function will be regarded as SEO information by default.
  254. ``seo()`` can be used in 2 ways: direct call and decorator::
  255. @seo("title", "description")
  256. def foo():
  257. pass
  258. def bar():
  259. pass
  260. def hello():
  261. \"""Application title
  262. Application description...
  263. (A empty line is used to separate the description and title)
  264. \"""
  265. start_server([
  266. foo,
  267. hello,
  268. seo("title", "description", bar),
  269. ])
  270. .. versionadded:: 1.1
  271. .. deprecated:: 1.4
  272. Use :func:`pywebio.config` instead.
  273. """
  274. import warnings
  275. warnings.warn("`pywebio.platform.seo()` is deprecated since v1.4 and will remove in the future version, "
  276. "use `pywebio.config` instead", DeprecationWarning, stacklevel=2)
  277. if app is not None:
  278. return config(title=title, description=description)(app)
  279. return config(title=title, description=description)
  280. def config(*, title=None, description=None, theme=None, js_code=None, js_file=[], css_style=None, css_file=[]):
  281. """PyWebIO application configuration
  282. :param str title: Application title
  283. :param str description: Application description
  284. :param str theme: Application theme. Available themes are: ``dark``, ``sketchy``, ``minty``, ``yeti``.
  285. You can also use environment variable ``PYWEBIO_THEME`` to specify the theme (with high priority).
  286. :demo_host:`Theme preview demo </theme>`
  287. .. collapse:: Open Source Credits
  288. The dark theme is modified from ForEvolve's `bootstrap-dark <https://github.com/ForEvolve/bootstrap-dark>`_.
  289. The sketchy, minty and yeti theme are from `bootswatch <https://bootswatch.com/4/>`_.
  290. :param str js_code: The javascript code that you want to inject to page.
  291. :param str/list js_file: The javascript files that inject to page, can be a URL in str or a list of it.
  292. :param str css_style: The CSS style that you want to inject to page.
  293. :param str/list css_file: The CSS files that inject to page, can be a URL in str or a list of it.
  294. ``config()`` can be used in 2 ways: direct call and decorator.
  295. If you call ``config()`` directly, the configuration will be global.
  296. If you use ``config()`` as decorator, the configuration will only work on single PyWebIO application function.
  297. ::
  298. config(title="My application")
  299. @config(css_style="* { color:red }")
  300. def app():
  301. put_text("hello PyWebIO")
  302. ``title`` and ``description`` are used for SEO, which are provided when indexed by search engines.
  303. If no ``title`` and ``description`` set for a PyWebIO application function,
  304. the `docstring <https://www.python.org/dev/peps/pep-0257/>`_ of the function will be used as title and description by default::
  305. def app():
  306. \"""Application title
  307. Application description...
  308. (A empty line is used to separate the description and title)
  309. \"""
  310. pass
  311. The above code is equal to::
  312. @config(title="Application title", description="Application description...")
  313. def app():
  314. pass
  315. .. versionadded:: 1.4
  316. .. versionchanged:: 1.5
  317. add ``theme`` parameter
  318. """
  319. if isinstance(js_file, str):
  320. js_file = [js_file]
  321. if isinstance(css_file, str):
  322. css_file = [css_file]
  323. configs = locals()
  324. class Decorator:
  325. def __init__(self):
  326. self.called = False
  327. def __call__(self, func):
  328. self.called = True
  329. try:
  330. func = partial(func) # to make a copy of the function
  331. for key, val in configs.items():
  332. if val:
  333. setattr(func, '_pywebio_%s' % key, val)
  334. except Exception:
  335. pass
  336. return func
  337. def __del__(self): # if not called as decorator, set the config to global
  338. if self.called:
  339. return
  340. global _global_config
  341. _global_config = configs
  342. return Decorator()