output.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import json
  2. import logging
  3. from collections.abc import Mapping
  4. from base64 import b64encode
  5. from .framework import Global, Task
  6. from .input_ctrl import send_msg, single_input, input_control, next_event, run_async
  7. from .output_ctl import register_callback
  8. import asyncio
  9. import inspect
  10. def set_title(title):
  11. send_msg('output_ctl', dict(title=title))
  12. def set_output_fixed_height(enabled=True):
  13. send_msg('output_ctl', dict(output_fixed_height=enabled))
  14. def set_auto_scroll_bottom(enabled=True):
  15. send_msg('output_ctl', dict(auto_scroll_bottom=enabled))
  16. def set_anchor(name):
  17. """
  18. 在当前输出处标记锚点。 若已经存在name锚点,则先将旧锚点删除
  19. """
  20. inner_ancher_name = 'pywebio-anchor-%s' % name
  21. send_msg('output_ctl', dict(set_anchor=inner_ancher_name))
  22. def clear_before(anchor):
  23. """清除anchor锚点之前输出的内容"""
  24. inner_ancher_name = 'pywebio-anchor-%s' % anchor
  25. send_msg('output_ctl', dict(clear_before=inner_ancher_name))
  26. def clear_after(anchor):
  27. """清除anchor锚点之后输出的内容"""
  28. inner_ancher_name = 'pywebio-anchor-%s' % anchor
  29. send_msg('output_ctl', dict(clear_after=inner_ancher_name))
  30. def clear_range(start_anchor, end_ancher):
  31. """
  32. 清除start_anchor-end_ancher锚点之间输出的内容.
  33. 若 start_anchor 或 end_ancher 不存在,则不进行任何操作
  34. """
  35. inner_start_anchor_name = 'pywebio-anchor-%s' % start_anchor
  36. inner_end_ancher_name = 'pywebio-anchor-%s' % end_ancher
  37. send_msg('output_ctl', dict(clear_range=[inner_start_anchor_name, inner_end_ancher_name]))
  38. def scroll_to(anchor):
  39. """将页面滚动到anchor锚点处"""
  40. inner_ancher_name = 'pywebio-anchor-%s' % anchor
  41. send_msg('output_ctl', dict(scroll_to=inner_ancher_name))
  42. def text_print(text, *, ws=None):
  43. if text is None:
  44. text = ''
  45. msg = dict(command="output", spec=dict(content=text, type='text'))
  46. (ws or Global.active_ws).write_message(json.dumps(msg))
  47. def json_print(obj):
  48. text = "```\n%s\n```" % json.dumps(obj, indent=4, ensure_ascii=False)
  49. text_print(text)
  50. def put_markdown(mdcontent, strip_indent=0, lstrip=False):
  51. """
  52. 输出Markdown内容。当在函数中使用Python的三引号语法输出多行内容时,为了排版美观可能会对Markdown文本进行缩进,
  53. 这时候,可以设置strip_indent或lstrip来防止Markdown错误解析
  54. :param mdcontent: Markdown文本
  55. :param strip_indent: 去除行开始的缩进空白数。
  56. :param lstrip: 是否去除行开始的空白。
  57. :return:
  58. """
  59. if strip_indent:
  60. lines = (
  61. i[:strip_indent] if (i[:strip_indent] == ' ' * strip_indent) else i
  62. for i in mdcontent.splitlines()
  63. )
  64. mdcontent = '\n'.join(lines)
  65. if lstrip:
  66. lines = (i.lstrip() for i in mdcontent.splitlines())
  67. mdcontent = '\n'.join(lines)
  68. text_print(mdcontent)
  69. def put_table(tdata, header=None):
  70. """
  71. 输出表格
  72. :param tdata: list of list|dict
  73. :param header: 列表,当tdata为字典列表时,header指定表头顺序
  74. :return:
  75. """
  76. if header:
  77. tdata = [
  78. [row.get(k, '') for k in header]
  79. for row in tdata
  80. ]
  81. def quote(data):
  82. return str(data).replace('|', r'\|')
  83. # 防止当tdata只有一行时,无法显示表格
  84. if len(tdata) == 1:
  85. tdata[0:0] = [' '] * len(tdata[0])
  86. header = "|%s|" % "|".join(map(quote, tdata[0]))
  87. res = [header]
  88. res.append("|%s|" % "|".join(['----'] * len(tdata[0])))
  89. for tr in tdata[1:]:
  90. t = "|%s|" % "|".join(map(quote, tr))
  91. res.append(t)
  92. text_print('\n'.join(res))
  93. def _format_button(buttons):
  94. """
  95. 格式化按钮参数
  96. :param buttons: button列表, button可用形式:
  97. {value:, label:, }
  98. (value, label,)
  99. value 单值,label等于value
  100. :return: [{value:, label:, }, ...]
  101. """
  102. btns = []
  103. for btn in buttons:
  104. if isinstance(btn, Mapping):
  105. assert 'value' in btn and 'label' in btn, 'actions item must have value and label key'
  106. elif isinstance(btn, list):
  107. assert len(btn) == 2, 'actions item format error'
  108. btn = dict(zip(('value', 'label'), btn))
  109. else:
  110. btn = dict(value=btn, label=btn)
  111. btns.append(btn)
  112. return btns
  113. def td_buttons(buttons, onclick, save=None, mutex_mode=False):
  114. """
  115. 在表格中显示一组按钮
  116. 参数含义同 buttons 函数
  117. :return:
  118. """
  119. btns = _format_button(buttons)
  120. callback_id = register_callback(onclick, save, mutex_mode)
  121. tpl = '<button type="button" value="{value}" class="btn btn-primary btn-sm" ' \
  122. 'onclick="WebIO.DisplayAreaButtonOnClick(this, \'%s\')">{label}</button>' % callback_id
  123. btns_html = [tpl.format(**b) for b in btns]
  124. return ' '.join(btns_html)
  125. def buttons(buttons, onclick, small=False, save=None, mutex_mode=False):
  126. """
  127. 显示一组按钮
  128. :param buttons: button列表, button可用形式: value 只能为字符串
  129. {value:, label:, }
  130. (value, label,)
  131. value 单值,label等于value
  132. :param onclick: CallBack(btn_value, save) CallBack can be generator function or coroutine function
  133. :param save:
  134. :param mutex_mode: 互斥模式,回调在运行过程中,无法响应同一回调,仅当onclick为协程函数时有效
  135. :return:
  136. """
  137. btns = _format_button(buttons)
  138. callback_id = register_callback(onclick, save, mutex_mode)
  139. send_msg('output', dict(type='buttons', callback_id=callback_id, buttons=btns, small=small))
  140. def put_file(name, content):
  141. """
  142. :param name: file name
  143. :param content: bytes-like object
  144. :return:
  145. """
  146. content = b64encode(content).decode('ascii')
  147. send_msg('output', dict(type='file', name=name, content=content))