interact.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import tornado.websocket
  2. import time, json
  3. from collections import defaultdict
  4. from .framework import Future, Msg, Global
  5. from collections.abc import Iterable, Mapping, Sequence
  6. import logging
  7. logger = logging.getLogger(__name__)
  8. def run_async(coro):
  9. Global.active_ws.inactive_coro_instances.append(coro)
  10. def send_msg(cmd, spec=None):
  11. msg = dict(command=cmd, spec=spec, coro_id=Global.active_coro_id)
  12. Global.active_ws.write_message(json.dumps(msg))
  13. def get_response(cmd, spec):
  14. send_msg(cmd, spec)
  15. response_msg = yield from Future()
  16. return response_msg
  17. TEXT = 'text'
  18. NUMBER = "number"
  19. PASSWORD = "password"
  20. CHECKBOX = 'checkbox'
  21. RADIO = 'radio'
  22. SELECT = 'select'
  23. def _input_event_handle(valid_funcs, whole_valid_func=None):
  24. """
  25. 根据提供的校验函数处理表单事件
  26. :param valid_funcs: map(name -> valid_func) valid_func 为 None 时,不进行验证
  27. valid_func: callback(data) -> error_msg
  28. :param whole_valid_func: callback(data) -> (name, error_msg)
  29. :return:
  30. """
  31. while True:
  32. event = yield
  33. event_name, event_data = event['event'], event['data']
  34. if event_name == 'input_event':
  35. input_event = event_data['event_name']
  36. if input_event == 'blur':
  37. onblur_name = event_data['name']
  38. valid_func = valid_funcs.get(onblur_name)
  39. if valid_func is None:
  40. continue
  41. error_msg = valid_func(event_data['value'])
  42. if error_msg is not None:
  43. send_msg('update_input', dict(target_name=onblur_name, attributes={
  44. 'valid_status': False,
  45. 'invalid_feedback': error_msg
  46. }))
  47. elif event_name == 'from_submit':
  48. all_valid = True
  49. # 调用输入项验证函数进行校验
  50. for name, valid_func in valid_funcs.items():
  51. if valid_func is None:
  52. continue
  53. error_msg = valid_func(event_data[name])
  54. if error_msg is not None:
  55. all_valid = False
  56. send_msg('update_input', dict(target_name=name, attributes={
  57. 'valid_status': False,
  58. 'invalid_feedback': error_msg
  59. }))
  60. # 调用表单验证函数进行校验
  61. if whole_valid_func:
  62. v_res = whole_valid_func(event_data)
  63. if v_res is not None:
  64. all_valid = False
  65. onblur_name, error_msg = v_res
  66. send_msg('update_input', dict(target_name=onblur_name, attributes={
  67. 'valid_status': False,
  68. 'invalid_feedback': error_msg
  69. }))
  70. if all_valid:
  71. break
  72. return event['data']
  73. def _make_input_spec(label, type, name, valid_func=None, multiple=None, inline=None, other_html_attrs=None,
  74. **other_kwargs):
  75. """
  76. 校验传入input函数和select函数的参数
  77. 生成input_group消息中spec inputs参数列表项
  78. 支持的input类型 TEXT, NUMBER, PASSWORD, CHECKBOX, RADIO, SELECT
  79. """
  80. allowed_type = {TEXT, NUMBER, PASSWORD, CHECKBOX, RADIO, SELECT}
  81. assert type in allowed_type, 'Input type not allowed.'
  82. input_item = other_kwargs
  83. input_item.update(other_html_attrs or {})
  84. input_item.update(dict(label=label, type=type, name=name))
  85. if valid_func is not None and type in (CHECKBOX, RADIO): # CHECKBOX, RADIO 不支持valid_func参数
  86. logger.warning('valid_func can\'t be used when type in (CHECKBOX, RADIO)')
  87. if inline is not None and type not in {CHECKBOX, RADIO}:
  88. logger.warning('inline 只能用于 CHECKBOX, RADIO type, now type:%s', type)
  89. elif inline is not None:
  90. input_item['inline'] = inline
  91. if multiple is not None and type != SELECT:
  92. logger.warning('multiple 参数只能用于SELECT type, now type:%s', type)
  93. elif multiple is not None:
  94. input_item['multiple'] = multiple
  95. if type in {CHECKBOX, RADIO, SELECT}:
  96. assert 'options' in input_item, 'Input type not allowed.'
  97. assert isinstance(input_item['options'], Iterable), 'options must be list type'
  98. # option 可用形式:
  99. # {value:, label:, [selected:,] [disabled:]}
  100. # (value, label, [selected,] [disabled])
  101. # value 单值,label等于value
  102. opts = input_item['options']
  103. opts_res = []
  104. for opt in opts:
  105. if isinstance(opt, Mapping):
  106. assert 'value' in opt and 'label' in opt, 'options item must have value and label key'
  107. elif isinstance(opt, list):
  108. assert len(opt) > 1 and len(opt) <= 4, 'options item format error'
  109. opt = dict(zip(('value', 'label', 'selected', 'disabled'), opt))
  110. else:
  111. opt = dict(value=opt, label=opt)
  112. opts_res.append(opt)
  113. input_item['options'] = opts_res
  114. # todo spec参数中,为None的表示使用默认,不发送
  115. for attr, val in list(input_item.items()):
  116. if val is None:
  117. del input_item[attr]
  118. return input_item
  119. def input(label, type=TEXT, *, valid_func=None, name='data', value='', placeholder='', required=None, readonly=None,
  120. disabled=None, **other_html_attrs):
  121. input_kwargs = dict(locals())
  122. input_kwargs['label'] = ''
  123. input_kwargs['__name__'] = input.__name__
  124. input_kwargs.setdefault('autofocus', True) # 如果没有设置autofocus参数,则开启参数
  125. # 参数检查
  126. allowed_type = {TEXT, NUMBER, PASSWORD}
  127. assert type in allowed_type, 'Input type not allowed.'
  128. data = yield from input_group(label=label, inputs=[input_kwargs])
  129. return data[name]
  130. def select(label, options, type=SELECT, *, multiple=None, valid_func=None, name='data', value='', placeholder='',
  131. required=None, readonly=None, disabled=None, inline=None, **other_html_attrs):
  132. """
  133. 参数值为None表示不指定,使用默认值
  134. :param label:
  135. :param options: option 列表
  136. option 可用形式:
  137. {value:, label:, [selected:,] [disabled:]}
  138. (value, label, [selected,] [disabled])
  139. value 单值,label等于value
  140. :param type:
  141. :param multiple:
  142. :param valid_func:
  143. :param name:
  144. :param value:
  145. :param placeholder:
  146. :param required:
  147. :param readonly:
  148. :param disabled:
  149. :param inline:
  150. :param other_html_attrs:
  151. :return:
  152. """
  153. input_kwargs = dict(locals())
  154. input_kwargs['label'] = ''
  155. input_kwargs['__name__'] = select.__name__
  156. if type == SELECT:
  157. input_kwargs.setdefault('autofocus', True) # 如果没有设置autofocus参数,则开启参数
  158. allowed_type = {CHECKBOX, RADIO, SELECT}
  159. assert type in allowed_type, 'Input type not allowed.'
  160. data = yield from input_group(label=label, inputs=[input_kwargs])
  161. return data[name]
  162. def _make_actions_input_spec(label, buttons, name):
  163. """
  164. :param label:
  165. :param actions: action 列表
  166. action 可用形式:
  167. {value:, label:, [disabled:]}
  168. (value, label, [disabled])
  169. value 单值,label等于value
  170. :return:
  171. """
  172. act_res = []
  173. for act in buttons:
  174. if isinstance(act, Mapping):
  175. assert 'value' in act and 'label' in act, 'actions item must have value and label key'
  176. elif isinstance(act, list):
  177. assert len(act) in (2, 3), 'actions item format error'
  178. act = dict(zip(('value', 'label', 'disabled'), act))
  179. else:
  180. act = dict(value=act, label=act)
  181. act_res.append(act)
  182. input_item = dict(type='actions', label=label, name=name, buttons=act_res)
  183. return input_item
  184. def actions(label, buttons, name='data'):
  185. """
  186. 选择一个动作。UI为多个按钮,点击后会将整个表单提交
  187. :param label:
  188. :param actions: action 列表
  189. action 可用形式:
  190. {value:, label:, [disabled:]}
  191. (value, label, [disabled])
  192. value 单值,label等于value
  193. 实现方式:
  194. 多个type=submit的input组成
  195. [
  196. <button data-name value>label</button>,
  197. ...
  198. ]
  199. """
  200. input_kwargs = dict(label='', buttons=buttons, name=name)
  201. input_kwargs['__name__'] = actions.__name__
  202. data = yield from input_group(label=label, inputs=[input_kwargs])
  203. return data[name]
  204. def input_group(label, inputs, valid_func=None):
  205. """
  206. :param label:
  207. :param inputs: list of generator or dict, dict的话,需要多加一项 __name__ 为当前函数名
  208. :param valid_func: callback(data) -> (name, error_msg)
  209. :return:
  210. """
  211. make_spec_funcs = {
  212. actions.__name__: _make_actions_input_spec,
  213. input.__name__: _make_input_spec,
  214. select.__name__: _make_input_spec,
  215. }
  216. item_valid_funcs = {}
  217. spec_inputs = []
  218. for input_g in inputs:
  219. if isinstance(input_g, dict):
  220. func_name = input_g.pop('__name__')
  221. input_kwargs = input_g
  222. else:
  223. input_kwargs = dict(input_g.gi_frame.f_locals) # 拷贝一份,不可以对locals进行修改
  224. func_name = input_g.__name__
  225. input_name = input_kwargs['name']
  226. item_valid_funcs[input_name] = input_kwargs.get('valid_func')
  227. input_item = make_spec_funcs[func_name](**input_kwargs)
  228. spec_inputs.append(input_item)
  229. if all('autofocus' not in i for i in spec_inputs): # 每一个输入项都没有设置autofocus参数
  230. for i in spec_inputs:
  231. text_inputs = {TEXT, NUMBER, PASSWORD, SELECT} # todo update
  232. if i.get('type') in text_inputs:
  233. i['autofocus'] = True
  234. break
  235. send_msg('input_group', dict(label=label, inputs=spec_inputs))
  236. data = yield from _input_event_handle(item_valid_funcs, valid_func)
  237. send_msg('destroy_form')
  238. return data
  239. def set_title(title):
  240. send_msg('output_ctl', dict(title=title))
  241. def text_print(text, *, ws=None):
  242. msg = dict(command="output", spec=dict(content=text, type='text'))
  243. (ws or Global.active_ws).write_message(json.dumps(msg))
  244. def json_print(obj):
  245. text = "```\n%s\n```" % json.dumps(obj, indent=4, ensure_ascii=False)
  246. text_print(text)