async_realtime_environment.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import asyncio
  2. from time import monotonic
  3. from typing import Any, Optional, Union
  4. from numpy import Infinity
  5. from simpy.core import EmptySchedule, Environment, Infinity, SimTime, StopSimulation
  6. from simpy.events import NORMAL, URGENT, AllOf, AnyOf, Event, EventPriority, Process, ProcessGenerator, Timeout
  7. from simpy.rt import RealtimeEnvironment
  8. class AsyncRealtimeEnvironment(RealtimeEnvironment):
  9. """A real-time simulation environment that uses asyncio.
  10. The methods step and run are a 1-1 copy of the original methods from simpy.rt.RealtimeEnvironment,
  11. except that they are async and await asyncio.sleep instead of time.sleep.
  12. """
  13. async def step(self) -> None:
  14. """Process the next event after enough real-time has passed for the
  15. event to happen.
  16. The delay is scaled according to the real-time :attr:`factor`. With
  17. :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if
  18. the event is processed too slowly.
  19. """
  20. evt_time = self.peek()
  21. if evt_time is Infinity:
  22. raise EmptySchedule()
  23. real_time = self.real_start + (evt_time - self.env_start) * self.factor
  24. if self.strict and monotonic() - real_time > self.factor:
  25. # Events scheduled for time *t* may take just up to *t+1*
  26. # for their computation, before an error is raised.
  27. delta = monotonic() - real_time
  28. raise RuntimeError(
  29. f'Simulation too slow for real time ({delta:.3f}s).'
  30. )
  31. # Sleep in a loop to fix inaccuracies of windows (see
  32. # http://stackoverflow.com/a/15967564 for details) and to ignore
  33. # interrupts.
  34. while True:
  35. delta = real_time - monotonic()
  36. if delta <= 0:
  37. break
  38. await asyncio.sleep(delta)
  39. Environment.step(self)
  40. async def run(
  41. self, until: Optional[Union[SimTime, Event]] = None
  42. ) -> Optional[Any]:
  43. """Executes :meth:`step()` until the given criterion *until* is met.
  44. - If it is ``None`` (which is the default), this method will return
  45. when there are no further events to be processed.
  46. - If it is an :class:`~simpy.events.Event`, the method will continue
  47. stepping until this event has been triggered and will return its
  48. value. Raises a :exc:`RuntimeError` if there are no further events
  49. to be processed and the *until* event was not triggered.
  50. - If it is a number, the method will continue stepping
  51. until the environment's time reaches *until*.
  52. """
  53. if until is not None:
  54. if not isinstance(until, Event):
  55. # Assume that *until* is a number if it is not None and
  56. # not an event. Create a Timeout(until) in this case.
  57. at: SimTime
  58. if isinstance(until, int):
  59. at = until
  60. else:
  61. at = float(until)
  62. if at <= self.now:
  63. raise ValueError(
  64. f'until(={at}) must be > the current simulation time.'
  65. )
  66. # Schedule the event before all regular timeouts.
  67. until = Event(self)
  68. until._ok = True
  69. until._value = None
  70. self.schedule(until, URGENT, at - self.now)
  71. elif until.callbacks is None:
  72. # Until event has already been processed.
  73. return until.value
  74. until.callbacks.append(StopSimulation.callback)
  75. try:
  76. while True:
  77. await self.step()
  78. except StopSimulation as exc:
  79. return exc.args[0] # == until.value
  80. except EmptySchedule:
  81. if until is not None:
  82. assert not until.triggered
  83. raise RuntimeError(
  84. f'No scheduled events left but "until" event was not '
  85. f'triggered: {until}'
  86. )
  87. return None