interact.py 10 KB


  1. import tornado.websocket
  2. import time, json
  3. from collections import defaultdict
  4. from .framework import Future, Msg, Global
  5. from collections.abc import Iterable, Mapping, Sequence
  6. import logging
  7. logger = logging.getLogger(__name__)
  8. def run_async(coro):
  9. Global.active_ws.inactive_coro_instances.append(coro)
  10. def send_msg(cmd, spec=None):
  11. msg = dict(command=cmd, spec=spec, coro_id=Global.active_coro_id)
  12. Global.active_ws.write_message(json.dumps(msg))
  13. def get_response(cmd, spec):
  14. send_msg(cmd, spec)
  15. response_msg = yield from Future()
  16. return response_msg
  17. TEXT = 'text'
  18. NUMBER = "number"
  19. PASSWORD = "password"
  20. CHECKBOX = 'checkbox'
  21. RADIO = 'radio'
  22. SELECT = 'select'
  23. def _input_event_handle(valid_funcs, whole_valid_func=None):
  24. """
  25. 根据提供的校验函数处理表单事件
  26. :param valid_funcs: map(name -> valid_func) valid_func 为 None 时,不进行验证
  27. valid_func: callback(data) -> error_msg
  28. :param whole_valid_func: callback(data) -> (name, error_msg)
  29. :return:
  30. """
  31. while True:
  32. event = yield
  33. event_name, event_data = event['event'], event['data']
  34. if event_name == 'input_event':
  35. input_event = event_data['event_name']
  36. if input_event == 'blur':
  37. onblur_name = event_data['name']
  38. valid_func = valid_funcs.get(onblur_name)
  39. if valid_func is None:
  40. continue
  41. error_msg = valid_func(event_data['value'])
  42. if error_msg is not None:
  43. send_msg('update_input', dict(target_name=onblur_name, attributes={
  44. 'valid_status': False,
  45. 'invalid_feedback': error_msg
  46. }))
  47. elif event_name == 'from_submit':
  48. all_valid = True
  49. # 调用输入项验证函数进行校验
  50. for name, valid_func in valid_funcs.items():
  51. if valid_func is None:
  52. continue
  53. error_msg = valid_func(event_data[name])
  54. if error_msg is not None:
  55. all_valid = False
  56. send_msg('update_input', dict(target_name=name, attributes={
  57. 'valid_status': False,
  58. 'invalid_feedback': error_msg
  59. }))
  60. # 调用表单验证函数进行校验
  61. if whole_valid_func:
  62. v_res = whole_valid_func(event_data)
  63. if v_res is not None:
  64. all_valid = False
  65. onblur_name, error_msg = v_res
  66. send_msg('update_input', dict(target_name=onblur_name, attributes={
  67. 'valid_status': False,
  68. 'invalid_feedback': error_msg
  69. }))
  70. if all_valid:
  71. break
  72. return event['data']
  73. def _make_input_spec(label, type, name, valid_func=None, multiple=None, inline=None, other_html_attrs=None,
  74. **other_kwargs):
  75. """
  76. 校验传入input函数和select函数的参数
  77. 生成input_group消息中spec inputs参数列表项
  78. 支持的input类型 TEXT, NUMBER, PASSWORD, CHECKBOX, RADIO, SELECT
  79. """
  80. allowed_type = {TEXT, NUMBER, PASSWORD, CHECKBOX, RADIO, SELECT}
  81. assert type in allowed_type, 'Input type not allowed.'
  82. input_item = other_kwargs
  83. input_item.update(other_html_attrs or {})
  84. input_item.update(dict(label=label, type=type, name=name))
  85. if valid_func is not None and type in (CHECKBOX, RADIO): # CHECKBOX, RADIO 不支持valid_func参数
  86. logger.warning('valid_func can\'t be used when type in (CHECKBOX, RADIO)')
  87. if inline is not None and type not in {CHECKBOX, RADIO}:
  88. logger.warning('inline 只能用于 CHECKBOX, RADIO type, now type:%s', type)
  89. elif inline is not None:
  90. input_item['inline'] = inline
  91. if multiple is not None and type != SELECT:
  92. logger.warning('multiple 参数只能用于SELECT type, now type:%s', type)
  93. elif multiple is not None:
  94. input_item['multiple'] = multiple
  95. if type in {CHECKBOX, RADIO, SELECT}:
  96. assert 'options' in input_item, 'Input type not allowed.'
  97. assert isinstance(input_item['options'], Iterable), 'options must be list type'
  98. # option 可用形式:
  99. # {value:, label:, [selected:,] [disabled:]}
  100. # (value, label, [selected,] [disabled])
  101. # value 单值,label等于value
  102. opts = input_item['options']
  103. opts_res = []
  104. for opt in opts:
  105. if isinstance(opt, Mapping):
  106. assert 'value' in opt and 'label' in opt, 'options item must have value and label key'
  107. elif isinstance(opt, list):
  108. assert len(opt) > 1 and len(opt) <= 4, 'options item format error'
  109. opt = dict(zip(('value', 'label', 'selected', 'disabled'), opt))
  110. else:
  111. opt = dict(value=opt, label=opt)
  112. opts_res.append(opt)
  113. input_item['options'] = opts_res
  114. # todo spec参数中,为None的表示使用默认,不发送
  115. for attr, val in list(input_item.items()):
  116. if val is None:
  117. del input_item[attr]
  118. return input_item
  119. def input(label, type=TEXT, *, valid_func=None, name='data', value='', placeholder='', required=None, readonly=None,
  120. disabled=None, **other_html_attrs):
  121. input_kwargs = dict(locals())
  122. input_kwargs['label'] = ''
  123. input_kwargs['__name__'] = input.__name__
  124. input_kwargs.setdefault('autofocus', True) # 如果没有设置autofocus参数,则开启参数
  125. # 参数检查
  126. allowed_type = {TEXT, NUMBER, PASSWORD}
  127. assert type in allowed_type, 'Input type not allowed.'
  128. data = yield from input_group(label=label, inputs=[input_kwargs])
  129. return data[name]
  130. def select(label, options, type=SELECT, *, multiple=None, valid_func=None, name='data', value='', placeholder='',
  131. required=None, readonly=None, disabled=None, inline=None, **other_html_attrs):
  132. """
  133. 参数值为None表示不指定,使用默认值
  134. :param label:
  135. :param options: option 列表
  136. option 可用形式:
  137. {value:, label:, [selected:,] [disabled:]}
  138. (value, label, [selected,] [disabled])
  139. value 单值,label等于value
  140. :param type:
  141. :param multiple:
  142. :param valid_func:
  143. :param name:
  144. :param value:
  145. :param placeholder:
  146. :param required:
  147. :param readonly:
  148. :param disabled:
  149. :param inline:
  150. :param other_html_attrs:
  151. :return:
  152. """
  153. input_kwargs = dict(locals())
  154. input_kwargs['label'] = ''
  155. input_kwargs['__name__'] = select.__name__
  156. if type == SELECT:
  157. input_kwargs.setdefault('autofocus', True) # 如果没有设置autofocus参数,则开启参数
  158. allowed_type = {CHECKBOX, RADIO, SELECT}
  159. assert type in allowed_type, 'Input type not allowed.'
  160. data = yield from input_group(label=label, inputs=[input_kwargs])
  161. return data[name]
  162. def _make_actions_input_spec(label, buttons, name):
  163. """
  164. :param label:
  165. :param actions: action 列表
  166. action 可用形式:
  167. {value:, label:, [disabled:]}
  168. (value, label, [disabled])
  169. value 单值,label等于value
  170. :return:
  171. """
  172. act_res = []
  173. for act in buttons:
  174. if isinstance(act, Mapping):
  175. assert 'value' in act and 'label' in act, 'actions item must have value and label key'
  176. elif isinstance(act, list):
  177. assert len(act) in (2, 3), 'actions item format error'
  178. act = dict(zip(('value', 'label', 'disabled'), act))
  179. else:
  180. act = dict(value=act, label=act)
  181. act_res.append(act)
  182. input_item = dict(type='actions', label=label, name=name, buttons=act_res)
  183. return input_item
  184. def actions(label, buttons, name='data'):
  185. """
  186. 选择一个动作。UI为多个按钮,点击后会将整个表单提交
  187. :param label:
  188. :param actions: action 列表
  189. action 可用形式:
  190. {value:, label:, [disabled:]}
  191. (value, label, [disabled])
  192. value 单值,label等于value
  193. 实现方式:
  194. 多个type=submit的input组成
  195. [
  196. <button data-name value>label</button>,
  197. ...
  198. ]
  199. """
  200. input_kwargs = dict(label='', buttons=buttons, name=name)
  201. input_kwargs['__name__'] = actions.__name__
  202. data = yield from input_group(label=label, inputs=[input_kwargs])
  203. return data[name]
  204. def input_group(label, inputs, valid_func=None):
  205. """
  206. :param label:
  207. :param inputs: list of generator or dict, dict的话,需要多加一项 __name__ 为当前函数名
  208. :param valid_func: callback(data) -> (name, error_msg)
  209. :return:
  210. """
  211. make_spec_funcs = {
  212. actions.__name__: _make_actions_input_spec,
  213. input.__name__: _make_input_spec,
  214. select.__name__: _make_input_spec,
  215. }
  216. item_valid_funcs = {}
  217. spec_inputs = []
  218. for input_g in inputs:
  219. if isinstance(input_g, dict):
  220. func_name = input_g.pop('__name__')
  221. input_kwargs = input_g
  222. else:
  223. input_kwargs = dict(input_g.gi_frame.f_locals) # 拷贝一份,不可以对locals进行修改
  224. func_name = input_g.__name__
  225. input_name = input_kwargs['name']
  226. item_valid_funcs[input_name] = input_kwargs.get('valid_func')
  227. input_item = make_spec_funcs[func_name](**input_kwargs)
  228. spec_inputs.append(input_item)
  229. if all('autofocus' not in i for i in spec_inputs): # 每一个输入项都没有设置autofocus参数
  230. for i in spec_inputs:
  231. text_inputs = {TEXT, NUMBER, PASSWORD, SELECT} # todo update
  232. if i.get('type') in text_inputs:
  233. i['autofocus'] = True
  234. break
  235. send_msg('input_group', dict(label=label, inputs=spec_inputs))
  236. data = yield from _input_event_handle(item_valid_funcs, valid_func)
  237. send_msg('destroy_form')
  238. return data
  239. def set_title(title):
  240. send_msg('output_ctl', dict(title=title))
  241. def text_print(text, *, ws=None):
  242. msg = dict(command="output", spec=dict(content=text, type='text'))
  243. (ws or Global.active_ws).write_message(json.dumps(msg))
  244. def json_print(obj):
  245. text = "```\n%s\n```" % json.dumps(obj, indent=4, ensure_ascii=False)
  246. text_print(text)