1
0

chat_room.py 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import asyncio
  2. from pywebio import start_server
  3. from pywebio.input import *
  4. from pywebio.output import *
  5. from pywebio.session import defer_call, info as session_info, run_async
  6. # 最大消息记录保存
  7. MAX_MESSAGES_CNT = 10 ** 4
  8. chat_msgs = [] # 聊天记录 (name, msg)
  9. online_users = set() # 在线用户
  10. def t(eng, chinese):
  11. """return English or Chinese text according to the user's browser language"""
  12. return chinese if 'zh' in session_info.user_language else eng
  13. async def refresh_msg(my_name, msg_box):
  14. """send new message to current session"""
  15. global chat_msgs
  16. last_idx = len(chat_msgs)
  17. while True:
  18. await asyncio.sleep(0.5)
  19. for m in chat_msgs[last_idx:]:
  20. if m[0] != my_name: # only refresh message that not sent by current user
  21. msg_box.append(put_markdown('`%s`: %s' % m, sanitize=True))
  22. # remove expired message
  23. if len(chat_msgs) > MAX_MESSAGES_CNT:
  24. chat_msgs = chat_msgs[len(chat_msgs) // 2:]
  25. last_idx = len(chat_msgs)
  26. async def main():
  27. """PyWebIO chat room
  28. You can chat with everyone currently online.
  29. 和当前所有在线的人聊天
  30. """
  31. global chat_msgs
  32. put_markdown(t("## PyWebIO chat room\nWelcome to the chat room, you can chat with all the people currently online. You can open this page in multiple tabs of your browser to simulate a multi-user environment. This application uses less than 90 lines of code, the source code is [here](https://github.com/wang0618/PyWebIO/blob/dev/demos/chat_room.py)", "## PyWebIO聊天室\n欢迎来到聊天室,你可以和当前所有在线的人聊天。你可以在浏览器的多个标签页中打开本页面来测试聊天效果。本应用使用不到90行代码实现,源代码[链接](https://github.com/wang0618/PyWebIO/blob/dev/demos/chat_room.py)"))
  33. msg_box = output()
  34. put_scrollable(msg_box, height=300, keep_bottom=True)
  35. nickname = await input(t("Your nickname", "请输入你的昵称"), required=True, validate=lambda n: t('This name is already been used', '昵称已被使用') if n in online_users or n == '📢' else None)
  36. online_users.add(nickname)
  37. chat_msgs.append(('📢', '`%s` joins the room. %s users currently online' % (nickname, len(online_users))))
  38. msg_box.append(put_markdown('`📢`: `%s` join the room. %s users currently online' % (nickname, len(online_users)), sanitize=True))
  39. @defer_call
  40. def on_close():
  41. online_users.remove(nickname)
  42. chat_msgs.append(('📢', '`%s` leaves the room. %s users currently online' % (nickname, len(online_users))))
  43. refresh_task = run_async(refresh_msg(nickname, msg_box))
  44. while True:
  45. data = await input_group(t('Send message', '发送消息'), [
  46. input(name='msg', help_text=t('Message content supports inline Markdown syntax', '消息内容支持行内Markdown语法')),
  47. actions(name='cmd', buttons=[t('Send', '发送'), t('Multiline Input', '多行输入'), {'label': t('Exit', '退出'), 'type': 'cancel'}])
  48. ], validate=lambda d: ('msg', 'Message content cannot be empty') if d['cmd'] == t('Send', '发送') and not d['msg'] else None)
  49. if data is None:
  50. break
  51. if data['cmd'] == t('Multiline Input', '多行输入'):
  52. data['msg'] = '\n' + await textarea('Message content', help_text=t('Message content supports Markdown syntax', '消息内容支持Markdown语法'))
  53. msg_box.append(put_markdown('`%s`: %s' % (nickname, data['msg']), sanitize=True))
  54. chat_msgs.append((nickname, data['msg']))
  55. refresh_task.close()
  56. toast("You have left the chat room")
  57. if __name__ == '__main__':
  58. start_server(main, debug=True, port=8080)