server.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. from __future__ import annotations
  2. import multiprocessing
  3. import socket
  4. from typing import List, Optional
  5. import uvicorn
  6. from . import globals # pylint: disable=redefined-builtin
  7. from . import storage # pylint: disable=redefined-builtin
  8. from . import native as native_module
  9. class CustomServerConfig(uvicorn.Config):
  10. storage_secret: Optional[str] = None
  11. method_queue: Optional[multiprocessing.Queue] = None
  12. response_queue: Optional[multiprocessing.Queue] = None
  13. class Server(uvicorn.Server):
  14. instance: Server
  15. @classmethod
  16. def create_singleton(cls, config: CustomServerConfig) -> None:
  17. """Create a singleton instance of the server."""
  18. cls.instance = cls(config=config)
  19. def run(self, sockets: Optional[List[socket.socket]] = None) -> None:
  20. self.instance = self
  21. assert isinstance(self.config, CustomServerConfig)
  22. if self.config.method_queue is not None and self.config.response_queue is not None:
  23. native_module.method_queue = self.config.method_queue
  24. native_module.response_queue = self.config.response_queue
  25. globals.app.native.main_window = native_module.WindowProxy()
  26. storage.set_storage_secret(self.config.storage_secret)
  27. super().run(sockets=sockets)