traffic_tracking.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import logging
  2. import os
  3. import pickle
  4. import threading
  5. import time
  6. from typing import Dict, Set
  7. from starlette.requests import Request
  8. from nicegui import ui
  9. VISITS_FILE = 'traffic_data/visits.pickle'
  10. SESSIONS_FILE = 'traffic_data/sessions.pickle'
  11. visits: Dict[int, int] = {}
  12. sessions: Dict[int, Set[str]] = {}
  13. os.makedirs(os.path.dirname(VISITS_FILE), exist_ok=True)
  14. os.makedirs(os.path.dirname(SESSIONS_FILE), exist_ok=True)
  15. try:
  16. with open(VISITS_FILE, 'rb') as f:
  17. visits = pickle.load(f)
  18. with open(SESSIONS_FILE, 'rb') as f:
  19. sessions = pickle.load(f)
  20. except FileNotFoundError:
  21. pass
  22. except:
  23. logging.exception('Error loading traffic data')
  24. def keep_backups(self) -> None:
  25. def _save() -> None:
  26. try:
  27. with open(VISITS_FILE, 'wb') as f:
  28. pickle.dump(visits, f)
  29. with open(SESSIONS_FILE, 'wb') as f:
  30. pickle.dump(sessions, f)
  31. except:
  32. logging.exception('Error saving traffic data')
  33. while True:
  34. time.sleep(10)
  35. t = threading.Thread(target=_save, name='Save Traffic Data')
  36. t.start()
  37. class TrafficChart(ui.chart):
  38. def __init__(self) -> None:
  39. ui.on_connect(self.on_connect)
  40. ui.timer(10, self.update_visibility)
  41. super().__init__({
  42. 'title': {'text': 'Page Visits'},
  43. 'navigation': {'buttonOptions': {'enabled': False}},
  44. 'chart': {'type': 'line'},
  45. 'yAxis': {'title': False, 'type': 'logarithmic', },
  46. 'xAxis': {
  47. 'type': 'datetime',
  48. 'labels': {'format': '{value:%b %e}', },
  49. },
  50. 'series': [
  51. {'name': 'Views', 'data': []},
  52. {'name': 'Sessions', 'data': []},
  53. ],
  54. })
  55. def on_connect(self, request: Request) -> None:
  56. # ignore monitoring, web crawlers and the like
  57. agent = request.headers['user-agent'].lower()
  58. if any(s in agent for s in ('bot', 'spider', 'crawler', 'monitor', 'curl', 'wget', 'python-requests', 'kuma')):
  59. return
  60. def seconds_to_day(seconds: float) -> int: return int(seconds / 60 / 60 / 24)
  61. def day_to_milliseconds(day: int) -> float: return day * 24 * 60 * 60 * 1000
  62. today = seconds_to_day(time.time())
  63. visits[today] = visits.get(today, 0) + 1
  64. self.options.series[0].data[:] = [[day_to_milliseconds(day), count] for day, count in visits.items()]
  65. # remove first day because data are inconclusive depending on deployment time
  66. self.options.series[0].data[:] = self.options.series[0].data[1:]
  67. if today not in sessions:
  68. sessions[today] = set()
  69. sessions[today].add(request.session_id)
  70. self.options.series[1].data[:] = [[day_to_milliseconds(day), len(s)] for day, s in sessions.items()]
  71. # remove first day because data are inconclusive depending on deployment time
  72. self.options.series[1].data[:] = self.options.series[1].data[1:]
  73. self.update()
  74. def update_visibility(self) -> None:
  75. self.visible = len(visits.keys()) >= 3 and len(sessions.keys()) >= 3