io_ctrl.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. """
  2. 输入输出的底层实现函数
  3. """
  4. import inspect
  5. import json
  6. import logging
  7. from collections import UserList
  8. from functools import partial, wraps
  9. from .session import chose_impl, next_client_event, get_current_task_id, get_current_session
  10. from .utils import random_str
  11. logger = logging.getLogger(__name__)
  12. def scope2dom(name, no_css_selector=False):
  13. """Get the CSS selector/element name actually used in the front-end html page
  14. :param str/tuple name: When it is str, it is regarded as the Dom ID name;
  15. when tuple, the format is (css selector, element name)
  16. """
  17. selector = '#'
  18. if isinstance(name, tuple):
  19. selector, name = name
  20. name = name.replace(' ', '-')
  21. if no_css_selector:
  22. selector = ''
  23. return '%spywebio-scope-%s' % (selector, name)
  24. class Output:
  25. """ ``put_xxx()`` 类函数的返回值
  26. 若 ``put_xxx()`` 调用的返回值没有被变量接收,则直接将消息发送到会话;
  27. 否则消息则作为其他消息的一部分
  28. """
  29. @staticmethod
  30. def json_encoder(obj, ignore_error=False):
  31. """json序列化与输出相关消息的Encoder函数 """
  32. if isinstance(obj, Output):
  33. return obj.embed_data()
  34. elif isinstance(obj, OutputList):
  35. return obj.data
  36. if not ignore_error:
  37. raise TypeError('Object of type %s is not JSON serializable' % obj.__class__.__name__)
  38. @classmethod
  39. def dump_dict(cls, data):
  40. # todo 使用其他方式来转换spec
  41. return json.loads(json.dumps(data, default=cls.json_encoder))
  42. @classmethod
  43. def safely_destruct(cls, obj):
  44. """安全销毁 OutputReturn 对象/包含OutputReturn对象的dict/list, 使 OutputReturn.__del__ 不进行任何操作"""
  45. try:
  46. json.dumps(obj, default=partial(cls.json_encoder, ignore_error=True))
  47. except Exception:
  48. pass
  49. def __init__(self, spec, on_embed=None):
  50. self.processed = False
  51. self.on_embed = on_embed or (lambda d: d)
  52. try:
  53. self.spec = type(self).dump_dict(spec) # this may raise TypeError
  54. except TypeError:
  55. self.processed = True
  56. type(self).safely_destruct(spec)
  57. raise
  58. # For Context manager
  59. self.enabled_context_manager = False
  60. self.container_selector = None
  61. self.container_dom_id = None # todo: this name is ambiguous, rename it to `scope_name` or others
  62. self.custom_enter = None
  63. self.custom_exit = None
  64. # Try to make sure current session exist.
  65. # If we leave the session interaction in `Output.__del__`,
  66. # the Exception raised from there will be ignored by python interpreter,
  67. # thus we can't end some session in some cases.
  68. # See also: https://github.com/pywebio/PyWebIO/issues/243
  69. get_current_session()
  70. def enable_context_manager(self, container_selector=None, container_dom_id=None, custom_enter=None,
  71. custom_exit=None):
  72. self.enabled_context_manager = True
  73. self.container_selector = container_selector
  74. self.container_dom_id = container_dom_id
  75. self.custom_enter = custom_enter
  76. self.custom_exit = custom_exit
  77. return self
  78. def __enter__(self):
  79. if not self.enabled_context_manager:
  80. raise RuntimeError("This output function can't be used as context manager!")
  81. if self.custom_enter:
  82. return self.custom_enter(self)
  83. self.container_dom_id = self.container_dom_id or random_str(10)
  84. self.spec['container_selector'] = self.container_selector
  85. self.spec['container_dom_id'] = scope2dom(self.container_dom_id, no_css_selector=True)
  86. self.send()
  87. get_current_session().push_scope(self.container_dom_id)
  88. return self.container_dom_id
  89. def __exit__(self, exc_type, exc_val, exc_tb):
  90. """
  91. If this method returns True,
  92. it means that the context manager can handle the exception,
  93. so that the with statement terminates the propagation of the exception
  94. """
  95. if self.custom_exit:
  96. return self.custom_exit(self, exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
  97. get_current_session().pop_scope()
  98. return False # Propagate Exception
  99. def embed_data(self):
  100. """返回供嵌入到其他消息中的数据,可以设置一些默认值"""
  101. self.processed = True
  102. return self.on_embed(self.spec)
  103. def send(self):
  104. """发送输出内容到Client"""
  105. self.processed = True
  106. send_msg('output', self.spec)
  107. show = send # `show` is a more user-friendly name
  108. def style(self, css_style):
  109. """Set css style for output
  110. :param str css_style: CSS style string
  111. Example::
  112. put_text('hello').style('color: red; font-size: 20px')
  113. put_row([
  114. put_text('hello').style('color: red'),
  115. put_markdown('markdown')
  116. ]).style('margin-top: 20px')
  117. """
  118. self.spec.setdefault('style', '')
  119. self.spec['style'] += ';%s' % css_style
  120. return self
  121. def onclick(self, callback):
  122. """Add click callback to this widget.
  123. :param callable callback: Callback which will be called when the widget is clicked.
  124. """
  125. callback_id = output_register_callback(lambda _: callback())
  126. self.spec.setdefault('click_callback_id', '')
  127. self.spec['click_callback_id'] += callback_id
  128. return self
  129. def __del__(self):
  130. """返回值没有被变量接收时的操作:直接输出消息"""
  131. if not self.processed:
  132. # avoid `Exception ignored in xxx` error log
  133. try:
  134. self.send()
  135. except Exception:
  136. pass
  137. class OutputList(UserList):
  138. """
  139. 用于 style 对输出列表设置样式时的返回值
  140. """
  141. def __del__(self):
  142. """返回值没有被变量接收时的操作:顺序输出其持有的内容"""
  143. for o in self.data:
  144. o.__del__() # lgtm [py/explicit-call-to-delete]
  145. def safely_destruct_output_when_exp(content_param):
  146. """装饰器生成: 异常时安全释放 Output 对象
  147. :param content_param: 含有Output实例的参数名或参数名列表
  148. :type content_param: list/str
  149. :return: 装饰器
  150. """
  151. def decorator(func):
  152. sig = inspect.signature(func)
  153. @wraps(func)
  154. def inner(*args, **kwargs):
  155. try:
  156. return func(*args, **kwargs)
  157. except Exception:
  158. # 发生异常,安全地释放 Output 对象
  159. params = [content_param] if isinstance(content_param, str) else content_param
  160. bound = sig.bind(*args, **kwargs).arguments
  161. for param in params:
  162. if bound.get(param):
  163. Output.safely_destruct(bound.get(param))
  164. raise
  165. return inner
  166. return decorator
  167. def send_msg(cmd, spec=None, task_id=None):
  168. msg = dict(command=cmd, spec=spec, task_id=task_id or get_current_task_id())
  169. get_current_session().send_task_command(msg)
  170. def single_input_kwargs(single_input_return):
  171. try:
  172. # 协程模式下,单项输入为协程对象,可以通过send(None)来获取传入单项输入的参数字典
  173. # In the coroutine mode, the item of `inputs` is coroutine object.
  174. # using `send(None)` to get the single input function's parameter dict.
  175. single_input_return.send(None)
  176. except StopIteration as e: # This is in the coroutine mode
  177. input_kwargs = e.args[0]
  178. except AttributeError: # This is in the thread mode
  179. input_kwargs = single_input_return
  180. else:
  181. raise RuntimeError("Can't get kwargs from single input")
  182. return input_kwargs
  183. @chose_impl
  184. def single_input(item_spec, valid_func, preprocess_func, onchange_func):
  185. """
  186. Note: 鲁棒性在上层完成
  187. 将单个input构造成input_group,并获取返回值
  188. :param item_spec: 单个输入项的参数 'name' must in item_spec, 参数一定已经验证通过
  189. :param valid_func: Not None
  190. :param onchange_func: Not None
  191. :param preprocess_func: Not None, 预处理函数,在收到用户提交的单项输入的原始数据后用于在校验前对数据进行预处理
  192. """
  193. if item_spec.get('name') is None: # single input
  194. item_spec['name'] = 'data'
  195. else: # as input_group item
  196. # use `single_input_kwargs()` to get the returned value
  197. return dict(item_spec=item_spec, valid_func=valid_func,
  198. preprocess_func=preprocess_func, onchange_func=onchange_func)
  199. label = item_spec['label']
  200. name = item_spec['name']
  201. # todo 是否可以原地修改spec
  202. item_spec['label'] = ''
  203. item_spec.setdefault('auto_focus', True) # 如果没有设置autofocus参数,则开启参数 todo CHECKBOX, RADIO 特殊处理
  204. spec = dict(label=label, inputs=[item_spec])
  205. data = yield input_control(spec=spec,
  206. preprocess_funcs={name: preprocess_func},
  207. item_valid_funcs={name: valid_func},
  208. onchange_funcs={name: onchange_func})
  209. if not data: # form cancel
  210. return None
  211. return data[name]
  212. @chose_impl
  213. def input_control(spec, preprocess_funcs, item_valid_funcs, onchange_funcs, form_valid_funcs=None):
  214. """
  215. 发送input命令,监听事件,验证输入项,返回结果
  216. :param spec:
  217. :param preprocess_funcs: keys 严格等于 spec中的name集合
  218. :param item_valid_funcs: keys 严格等于 spec中的name集合
  219. :param onchange_funcs: keys 严格等于 spec中的name集合
  220. :param form_valid_funcs: can be ``None``
  221. :return:
  222. """
  223. send_msg('input_group', spec)
  224. data = yield input_event_handle(item_valid_funcs, form_valid_funcs, preprocess_funcs, onchange_funcs)
  225. send_msg('destroy_form')
  226. return data
  227. def check_item(name, data, valid_func, preprocess_func):
  228. try:
  229. data = preprocess_func(data)
  230. error_msg = valid_func(data)
  231. except Exception as e:
  232. logger.warning('Get %r in valid_func for name:"%s"', e, name)
  233. from pywebio.session import info as session_info
  234. error_msg = '字段内容不合法' if 'zh' in session_info.user_language else 'Your input is not valid'
  235. if error_msg is not None:
  236. send_msg('update_input', dict(target_name=name, attributes={
  237. 'valid_status': False,
  238. 'invalid_feedback': error_msg
  239. }))
  240. return False
  241. else:
  242. send_msg('update_input', dict(target_name=name, attributes={
  243. 'valid_status': 0, # valid_status为0表示清空valid_status标志
  244. }))
  245. return True
  246. def trigger_onchange(event_data, onchange_funcs):
  247. name = event_data['name']
  248. onchange_func = onchange_funcs[name]
  249. # save current input name to session, so that the `input_update()` function can get it
  250. task_id = get_current_task_id()
  251. onchange_trigger_key = 'onchange_trigger-' + task_id
  252. previous_name = get_current_session().internal_save.get(onchange_trigger_key)
  253. get_current_session().internal_save[onchange_trigger_key] = name # used in `pywebio.input.input_update()`
  254. try:
  255. onchange_func(event_data['value'])
  256. except Exception as e:
  257. logger.warning('Get %r in onchange function for name:"%s"', e, name)
  258. finally:
  259. if previous_name is None:
  260. get_current_session().internal_save.pop(onchange_trigger_key, None)
  261. else:
  262. get_current_session().internal_save[onchange_trigger_key] = previous_name
  263. @chose_impl
  264. def input_event_handle(item_valid_funcs, form_valid_funcs, preprocess_funcs, onchange_funcs):
  265. """
  266. 根据提供的校验函数处理表单事件
  267. :param item_valid_funcs: map(name -> valid_func) valid_func 为 None 时,不进行验证
  268. valid_func: callback(data) -> error_msg or None
  269. :param form_valid_funcs: callback(data) -> (name, error_msg) or None
  270. :param preprocess_funcs: map(name -> process_func)
  271. :param onchange_funcs: map(name -> onchange_func)
  272. :return:
  273. """
  274. while True:
  275. event = yield next_client_event()
  276. event_name, event_data = event['event'], event['data']
  277. if event_name == 'input_event':
  278. input_event = event_data['event_name']
  279. if input_event == 'blur':
  280. onblur_name = event_data['name']
  281. check_item(onblur_name, event_data['value'], item_valid_funcs[onblur_name],
  282. preprocess_funcs[onblur_name])
  283. elif input_event == 'change':
  284. trigger_onchange(event_data, onchange_funcs)
  285. elif event_name == 'from_submit':
  286. all_valid = True
  287. # 调用输入项验证函数进行校验
  288. for name, valid_func in item_valid_funcs.items():
  289. if not check_item(name, event_data[name], valid_func, preprocess_funcs[name]):
  290. all_valid = False
  291. if all_valid: # todo 减少preprocess_funcs[name]调用次数
  292. data = {name: preprocess_funcs[name](val) for name, val in event_data.items()}
  293. # 调用表单验证函数进行校验
  294. if form_valid_funcs:
  295. v_res = form_valid_funcs(data)
  296. if v_res is not None:
  297. all_valid = False
  298. try:
  299. onblur_name, error_msg = v_res
  300. except Exception:
  301. # Use `raise Exception from None` to disable exception chaining
  302. # see: https://docs.python.org/3/tutorial/errors.html#exception-chaining
  303. raise ValueError("The `validate` function for input group must "
  304. "return `(name, error_msg)` when validation failed.") from None
  305. send_msg('update_input', dict(target_name=onblur_name, attributes={
  306. 'valid_status': False,
  307. 'invalid_feedback': error_msg
  308. }))
  309. if all_valid:
  310. break
  311. elif event_name == 'from_cancel':
  312. data = None
  313. break
  314. else:
  315. logger.warning("Unhandled Event: %s", event)
  316. return data
  317. def output_register_callback(callback, **options):
  318. """向当前会话注册回调函数"""
  319. task_id = get_current_session().register_callback(callback, **options)
  320. return task_id