file_persistent_dict.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from pathlib import Path
  2. from typing import Optional
  3. import aiofiles
  4. from nicegui import background_tasks, core, json
  5. from nicegui.logging import log
  6. from .persistent_dict import PersistentDict
  7. class FilePersistentDict(PersistentDict):
  8. def __init__(self, filepath: Path, encoding: Optional[str] = None, *, indent: bool = False) -> None:
  9. self.filepath = filepath
  10. self.encoding = encoding
  11. self.indent = indent
  12. super().__init__(data={}, on_change=self.backup)
  13. async def initialize(self) -> None:
  14. try:
  15. if self.filepath.exists():
  16. async with aiofiles.open(self.filepath, encoding=self.encoding) as f:
  17. data = json.loads(await f.read())
  18. else:
  19. data = {}
  20. self.update(data)
  21. except Exception:
  22. log.warning(f'Could not load storage file {self.filepath}')
  23. def initialize_sync(self) -> None:
  24. try:
  25. if self.filepath.exists():
  26. data = json.loads(self.filepath.read_text(encoding=self.encoding))
  27. else:
  28. data = {}
  29. self.update(data)
  30. except Exception:
  31. log.warning(f'Could not load storage file {self.filepath}')
  32. def backup(self) -> None:
  33. """Back up the data to the given file path."""
  34. if not self.filepath.exists():
  35. if not self:
  36. return
  37. self.filepath.parent.mkdir(exist_ok=True)
  38. @background_tasks.await_on_shutdown
  39. async def async_backup() -> None:
  40. async with aiofiles.open(self.filepath, 'w', encoding=self.encoding) as f:
  41. await f.write(json.dumps(self, indent=self.indent))
  42. if core.loop and core.loop.is_running():
  43. background_tasks.create_lazy(async_backup(), name=self.filepath.stem)
  44. else:
  45. self.filepath.write_text(json.dumps(self, indent=self.indent), encoding=self.encoding)
  46. def clear(self) -> None:
  47. super().clear()
  48. self.filepath.unlink(missing_ok=True)