interact.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import tornado.websocket
  2. import time, json
  3. from collections import defaultdict
  4. from .framework import Future, Msg, Global
  5. from collections.abc import Iterable, Mapping, Sequence
  6. import logging
  7. logger = logging.getLogger(__name__)
  8. def run_async(coro):
  9. Global.active_ws.inactive_coro_instances.append(coro)
  10. def send_msg(cmd, spec=None):
  11. msg = dict(command=cmd, spec=spec, coro_id=Global.active_coro_id)
  12. Global.active_ws.write_message(json.dumps(msg))
  13. def get_response(cmd, spec):
  14. send_msg(cmd, spec)
  15. response_msg = yield from Future()
  16. return response_msg
  17. TEXT = 'text'
  18. NUMBER = "number"
  19. PASSWORD = "password"
  20. CHECKBOX = 'checkbox'
  21. RADIO = 'radio'
  22. SELECT = 'select'
  23. def _input_event_handle(valid_funcs, whole_valid_func=None):
  24. """
  25. 根据提供的校验函数处理表单事件
  26. :param valid_funcs: map(name -> valid_func) valid_func 为 None 时,不进行验证
  27. valid_func: callback(data) -> error_msg
  28. :param whole_valid_func: callback(data) -> (name, error_msg)
  29. :return:
  30. """
  31. while True:
  32. event = yield
  33. event_name, event_data = event['event'], event['data']
  34. if event_name == 'input_event':
  35. input_event = event_data['event_name']
  36. if input_event == 'on_blur':
  37. onblur_name = event_data['name']
  38. valid_func = valid_funcs.get(onblur_name)
  39. if valid_func is None:
  40. continue
  41. error_msg = valid_func(event_data['value'])
  42. if error_msg is not None:
  43. send_msg('update_input', dict(target_name=onblur_name, attributes={
  44. 'valid_status': False,
  45. 'invalid_feedback': error_msg
  46. }))
  47. elif event_name == 'from_submit':
  48. all_valid = True
  49. # 调用输入项验证函数进行校验
  50. for name, valid_func in valid_funcs.items():
  51. if valid_func is None:
  52. continue
  53. error_msg = valid_func(event_data[name])
  54. if error_msg is not None:
  55. all_valid = False
  56. send_msg('update_input', dict(target_name=name, attributes={
  57. 'valid_status': False,
  58. 'invalid_feedback': error_msg
  59. }))
  60. # 调用表单验证函数进行校验
  61. if whole_valid_func:
  62. v_res = whole_valid_func(event_data)
  63. if v_res is not None:
  64. all_valid = False
  65. onblur_name, error_msg = v_res
  66. send_msg('update_input', dict(target_name=onblur_name, attributes={
  67. 'valid_status': False,
  68. 'invalid_feedback': error_msg
  69. }))
  70. if all_valid:
  71. break
  72. return event['data']
  73. def _make_input_spec(label, type, name, valid_func=None, multiple=None, inline=None, other_html_attrs=None,
  74. **other_kwargs):
  75. """
  76. 校验传入input函数和select函数的参数
  77. 生成input_group消息中spec inputs参数列表项
  78. 支持的input类型 TEXT, NUMBER, PASSWORD, CHECKBOX, RADIO, SELECT
  79. """
  80. allowed_type = {TEXT, NUMBER, PASSWORD, CHECKBOX, RADIO, SELECT}
  81. assert type in allowed_type, 'Input type not allowed.'
  82. input_item = other_kwargs
  83. input_item.update(other_html_attrs or {})
  84. input_item.update(dict(label=label, type=type, name=name))
  85. if valid_func is not None and type in (CHECKBOX, RADIO): # CHECKBOX, RADIO 不支持valid_func参数
  86. logger.warning('valid_func can\'t be used when type in (CHECKBOX, RADIO)')
  87. if inline is not None and type not in {CHECKBOX, RADIO}:
  88. logger.warning('inline 只能用于 CHECKBOX, RADIO type')
  89. elif inline is not None:
  90. input_item['inline'] = inline
  91. if multiple is not None and type != SELECT:
  92. logger.warning('multiple 参数只能用于SELECT type')
  93. elif multiple is not None:
  94. input_item['multiple'] = multiple
  95. if type in {CHECKBOX, RADIO, SELECT}:
  96. assert 'options' in input_item, 'Input type not allowed.'
  97. assert isinstance(input_item['options'], Iterable), 'options must be list type'
  98. # option 可用形式:
  99. # {value:, label:, [checked:,] [disabled:]}
  100. # (value, label, [checked,] [disabled])
  101. # value 单值,label等于value
  102. opts = input_item['options']
  103. opts_res = []
  104. for opt in opts:
  105. if isinstance(opt, Mapping):
  106. assert 'value' in opt and 'label' in opt, 'options item must have value and label key'
  107. elif isinstance(opt, list):
  108. assert len(opt) > 1 and len(opt) <= 4, 'options item format error'
  109. opt = dict(zip(('value', 'label', 'checked', 'disabled'), opt))
  110. else:
  111. opt = dict(value=opt, label=opt)
  112. opts_res.append(opt)
  113. input_item['options'] = opts_res
  114. # todo spec参数中,为默认值的可以不发送
  115. return input_item
  116. def input(label, type=TEXT, *, valid_func=None, name='data', value='', placeholder='', required=False, readonly=False,
  117. disabled=False, **other_html_attrs):
  118. input_kwargs = dict(locals())
  119. input_kwargs['label'] = ''
  120. input_kwargs['__name__'] = input.__name__
  121. input_kwargs.setdefault('autofocus', True) # 如果没有设置autofocus参数,则开启参数
  122. # 参数检查
  123. allowed_type = {TEXT, NUMBER, PASSWORD}
  124. assert type in allowed_type, 'Input type not allowed.'
  125. data = yield from input_group(label=label, inputs=[input_kwargs])
  126. return data[name]
  127. def select(label, options, type=SELECT, *, multiple=False, valid_func=None, name='data', value='', placeholder='',
  128. required=False, readonly=False, disabled=False, inline=False, **other_html_attrs):
  129. input_kwargs = dict(locals())
  130. input_kwargs['label'] = ''
  131. input_kwargs['__name__'] = select.__name__
  132. allowed_type = {CHECKBOX, RADIO, SELECT}
  133. assert type in allowed_type, 'Input type not allowed.'
  134. data = yield from input_group(label=label, inputs=[input_kwargs])
  135. return data[name]
  136. def _make_actions_input_spec(label, actions, name):
  137. """
  138. :param label:
  139. :param actions: action 列表
  140. action 可用形式:
  141. {value:, label:, [disabled:]}
  142. (value, label, [disabled])
  143. value 单值,label等于value
  144. :return:
  145. """
  146. act_res = []
  147. for act in actions:
  148. if isinstance(act, Mapping):
  149. assert 'value' in act and 'label' in act, 'actions item must have value and label key'
  150. elif isinstance(act, Sequence):
  151. assert len(act) in (2, 3), 'actions item format error'
  152. act = dict(zip(('value', 'label', 'disabled'), act))
  153. else:
  154. act = dict(value=act, label=act)
  155. act_res.append(act)
  156. input_item = dict(type='buttons', label=label, name=name, actions=actions)
  157. return input_item
  158. def actions(label, actions, name='data'):
  159. """
  160. 选择一个动作。UI为多个按钮,点击后会将整个表单提交
  161. :param label:
  162. :param actions: action 列表
  163. action 可用形式:
  164. {value:, label:, [disabled:]}
  165. (value, label, [disabled])
  166. value 单值,label等于value
  167. 实现方式:
  168. 多个type=submit的input组成
  169. [
  170. <button data-name value>label</button>,
  171. ...
  172. ]
  173. """
  174. input_kwargs = dict(label='', actions=actions, name=name)
  175. input_kwargs['__name__'] = select.__name__
  176. data = yield from input_group(label=label, inputs=[input_kwargs])
  177. return data[name]
  178. def input_group(label, inputs, valid_func=None):
  179. """
  180. :param label:
  181. :param inputs: list of generator or dict, dict的话,需要多加一项 __name__ 为当前函数名
  182. :param valid_func: callback(data) -> (name, error_msg)
  183. :return:
  184. """
  185. make_spec_funcs = {
  186. actions.__name__: _make_actions_input_spec,
  187. input.__name__: _make_input_spec,
  188. select.__name__: _make_input_spec
  189. }
  190. item_valid_funcs = {}
  191. spec_inputs = []
  192. for input_g in inputs:
  193. if isinstance(input_g, dict):
  194. func_name = input_g.pop('__name__')
  195. input_kwargs = input_g
  196. else:
  197. input_kwargs = dict(input_g.gi_frame.f_locals) # 拷贝一份,不可以对locals进行修改
  198. func_name = input_g.__name__
  199. input_name = input_kwargs['name']
  200. item_valid_funcs[input_name] = input_kwargs['valid_func']
  201. input_item = make_spec_funcs[func_name](**input_kwargs)
  202. spec_inputs.append(input_item)
  203. if all('autofocus' not in i for i in spec_inputs): # 每一个输入项都没有设置autofocus参数
  204. for i in spec_inputs:
  205. text_inputs = {TEXT, NUMBER, PASSWORD} # todo update
  206. if i.get('type') in text_inputs:
  207. i['autofocus'] = True
  208. break
  209. send_msg('input_group', dict(label=label, inputs=spec_inputs))
  210. data = yield from _input_event_handle(item_valid_funcs, valid_func)
  211. send_msg('destroy_form')
  212. return data
  213. def ctrl_coro(ctrl_info):
  214. msg = dict(command="ctrl", spec=ctrl_info)
  215. Global.active_ws.write_message(json.dumps(msg))
  216. def text_print(text, *, ws=None):
  217. msg = dict(command="output", spec=dict(content=text, type='text'))
  218. (ws or Global.active_ws).write_message(json.dumps(msg))
  219. def json_print(obj):
  220. text = "```\n%s\n```" % json.dumps(obj, indent=4, ensure_ascii=False)
  221. text_print(text)