utils.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. import urllib.parse
  2. from collections import namedtuple
  3. from collections.abc import Mapping, Sequence
  4. from functools import partial
  5. from os import path
  6. import fnmatch
  7. from urllib.parse import urlparse
  8. from tornado import template
  9. import json
  10. from collections import defaultdict
  11. from ..__version__ import __version__ as version
  12. from ..exceptions import PyWebIOWarning
  13. from ..utils import isgeneratorfunction, iscoroutinefunction, get_function_name, get_function_doc, \
  14. get_function_seo_info
  15. DEFAULT_CDN = "https://cdn.jsdelivr.net/gh/wang0618/PyWebIO-assets@v{version}/"
  16. AppMeta = namedtuple('App', 'title description')
  17. _here_dir = path.dirname(path.abspath(__file__))
  18. _index_page_tpl = template.Template(open(path.join(_here_dir, 'tpl', 'index.html'), encoding='utf8').read())
  19. def render_page(app, protocol, cdn):
  20. """渲染前端页面的HTML框架, 支持SEO
  21. :param callable app: PyWebIO app
  22. :param str protocol: 'ws'/'http'
  23. :param bool/str cdn: Whether to use CDN, also accept string as custom CDN URL
  24. :return: bytes content of rendered page
  25. """
  26. assert protocol in ('ws', 'http')
  27. meta = parse_app_metadata(app)
  28. if cdn is True:
  29. cdn = DEFAULT_CDN.format(version=version)
  30. elif not cdn:
  31. cdn = ''
  32. else: # user custom cdn
  33. cdn = cdn.rstrip('/') + '/'
  34. return _index_page_tpl.generate(title=meta.title or 'PyWebIO Application',
  35. description=meta.description, protocol=protocol,
  36. script=True, content='', base_url=cdn)
  37. def cdn_validation(cdn, level='warn', stacklevel=3):
  38. """CDN availability check
  39. :param bool/str cdn: cdn parameter
  40. :param level: warn or error
  41. """
  42. assert level in ('warn', 'error')
  43. if cdn is True and 'dev' in version:
  44. if level == 'warn':
  45. import warnings
  46. warnings.warn("Default CDN is not supported in dev version. Ignore the CDN setting", PyWebIOWarning,
  47. stacklevel=stacklevel)
  48. return False
  49. else:
  50. raise ValueError("Default CDN is not supported in dev version. Please host static files by yourself.")
  51. return cdn
  52. def parse_app_metadata(func):
  53. """解析pywebio app元数据"""
  54. seo_info = get_function_seo_info(func)
  55. if seo_info:
  56. return AppMeta(*seo_info)
  57. doc = get_function_doc(func)
  58. parts = doc.strip().split('\n\n', 1)
  59. if len(parts) == 2:
  60. title, description = parts
  61. else:
  62. title, description = parts[0], ''
  63. return AppMeta(title, description)
  64. _app_list_tpl = template.Template("""
  65. <h1>Applications index</h1>
  66. <ul>
  67. {% for name,meta in apps_info.items() %}
  68. <li>
  69. {% if other_arguments is not None %}
  70. <a href="?app={{name}}{{other_arguments}}">{{ meta.title or name }}</a>:
  71. {% else %}
  72. <a href="javascript:WebIO.openApp('{{ name }}', true)">{{ meta.title or name }}</a>:
  73. {% end %}
  74. {% if meta.description %}
  75. {{ meta.description }}
  76. {% else %}
  77. <i>No description.</i>
  78. {% end %}
  79. </li>
  80. {% end %}
  81. </ul>
  82. """.strip())
  83. def get_static_index_content(apps, query_arguments=None):
  84. """生成默认的静态主页
  85. :param callable apps: PyWebIO apps
  86. :param str query_arguments: Url Query Arguments。为None时,表示使用WebIO.openApp跳转
  87. :return: bytes
  88. """
  89. apps_info = {
  90. name: parse_app_metadata(func)
  91. for name, func in apps.items()
  92. }
  93. qs = urllib.parse.parse_qs(query_arguments)
  94. qs.pop('app', None)
  95. other_arguments = urllib.parse.urlencode(qs, doseq=True)
  96. if other_arguments:
  97. other_arguments = '&' + other_arguments
  98. else:
  99. other_arguments = None
  100. content = _app_list_tpl.generate(apps_info=apps_info, other_arguments=other_arguments).decode('utf8')
  101. return content
  102. def _generate_default_index_app(apps):
  103. """默认的主页任务函数"""
  104. content = get_static_index_content(apps)
  105. def index():
  106. from pywebio.output import put_html
  107. put_html(content)
  108. return index
  109. def make_applications(applications):
  110. """格式化 applications 为 任务名->任务函数 的映射, 并提供默认主页
  111. :param applications: 接受 单一任务函数、字典、列表 类型
  112. :return dict: 任务名->任务函数 的映射
  113. """
  114. if isinstance(applications, Sequence): # 列表 类型
  115. applications, app_list = {}, applications
  116. for func in app_list:
  117. name = get_function_name(func)
  118. if name in applications:
  119. raise ValueError("Duplicated application name:%r" % name)
  120. applications[name] = func
  121. elif not isinstance(applications, Mapping): # 单一任务函数 类型
  122. applications = {'index': applications}
  123. # covert dict key to str
  124. applications = {str(k): v for k, v in applications.items()}
  125. for app in applications.values():
  126. assert iscoroutinefunction(app) or isgeneratorfunction(app) or callable(app), \
  127. "Don't support application type:%s" % type(app)
  128. if 'index' not in applications:
  129. applications['index'] = _generate_default_index_app(applications)
  130. return applications
  131. class OriginChecker:
  132. @classmethod
  133. def check_origin(cls, origin, allowed_origins, host):
  134. if cls.is_same_site(origin, host):
  135. return True
  136. return any(
  137. fnmatch.fnmatch(origin, patten)
  138. for patten in allowed_origins
  139. )
  140. @staticmethod
  141. def is_same_site(origin, host):
  142. """判断 origin 和 host 是否一致。origin 和 host 都为http协议请求头"""
  143. parsed_origin = urlparse(origin)
  144. origin = parsed_origin.netloc
  145. origin = origin.lower()
  146. # Check to see that origin matches host directly, including ports
  147. return origin == host
  148. def deserialize_binary_event(data: bytes):
  149. """
  150. Data format:
  151. | event | file_header | file_data | file_header | file_data | ...
  152. The 8 bytes at the beginning of each segment indicate the number of bytes remaining in the segment.
  153. event: {
  154. event: "from_submit",
  155. task_id: that.task_id,
  156. data: {
  157. input_name => input_data
  158. }
  159. }
  160. file_header: {
  161. 'filename': file name,
  162. 'size': file size,
  163. 'mime_type': file type,
  164. 'last_modified': last_modified timestamp,
  165. 'input_name': name of input field
  166. }
  167. Example:
  168. b'\x00\x00\x00\x00\x00\x00\x00E{"event":"from_submit","task_id":"main-4788341456","data":{"data":1}}\x00\x00\x00\x00\x00\x00\x00Y{"filename":"hello.txt","size":2,"mime_type":"text/plain","last_modified":1617119937.276}\x00\x00\x00\x00\x00\x00\x00\x02ss'
  169. """
  170. parts = []
  171. start_idx = 0
  172. while start_idx < len(data):
  173. size = int.from_bytes(data[start_idx:start_idx + 8], "big")
  174. start_idx += 8
  175. content = data[start_idx:start_idx + size]
  176. parts.append(content)
  177. start_idx += size
  178. event = json.loads(parts[0])
  179. files = defaultdict(list)
  180. for idx in range(1, len(parts), 2):
  181. f = json.loads(parts[idx])
  182. f['content'] = parts[idx+1]
  183. input_name = f.pop('input_name')
  184. files[input_name].append(f)
  185. for input_name in list(event['data'].keys()):
  186. if input_name in files:
  187. event['data'][input_name] = files[input_name]
  188. return event
  189. def seo(title, description=None, app=None):
  190. """Set the SEO information of the PyWebIO application (web page information provided when indexed by search engines)
  191. :param str title: Application title
  192. :param str description: Application description
  193. :param callable app: PyWebIO task function
  194. If not ``seo()`` is not used, the `docstring <https://www.python.org/dev/peps/pep-0257/>`_ of the task function will be regarded as SEO information by default.
  195. ``seo()`` can be used in 2 ways: direct call and decorator::
  196. @seo("title", "description")
  197. def foo():
  198. pass
  199. def bar():
  200. pass
  201. def hello():
  202. \"""Application title
  203. Application description...
  204. (A empty line is used to separate the description and title)
  205. \"""
  206. start_server([
  207. foo,
  208. hello,
  209. seo("title", "description", bar),
  210. ])
  211. .. versionadded:: 1.1
  212. """
  213. if app is not None:
  214. return seo(title, description)(app)
  215. def decorator(func):
  216. try:
  217. func = partial(func)
  218. func._pywebio_title = title
  219. func._pywebio_description = description or ''
  220. except Exception:
  221. pass
  222. return func
  223. return decorator