1
0
Эх сурвалжийг харах

Merge pull request #955 from zauberzeug/simpy_example

Add SimPy example
Falko Schindler 2 жил өмнө
parent
commit
141338d73f

+ 1 - 1
.github/workflows/test.yml

@@ -25,7 +25,7 @@ jobs:
           poetry config virtualenvs.create false
           poetry install
           # install packages to run the examples
-          pip install opencv-python opencv-contrib-python-headless httpx replicate langchain openai
+          pip install opencv-python opencv-contrib-python-headless httpx replicate langchain openai simpy
           # try fix issue with importlib_resources
           pip install importlib-resources
       - name: test startup

+ 109 - 0
examples/simpy/async_realtime_environment.py

@@ -0,0 +1,109 @@
+import asyncio
+from time import monotonic
+from typing import Any, Optional, Union
+
+from numpy import Infinity
+from simpy.core import EmptySchedule, Environment, Infinity, SimTime, StopSimulation
+from simpy.events import URGENT, Event
+from simpy.rt import RealtimeEnvironment
+
+
+class AsyncRealtimeEnvironment(RealtimeEnvironment):
+    """A real-time simulation environment that uses asyncio.
+
+    The methods step and run are a 1-1 copy of the original methods from simpy.rt.RealtimeEnvironment,
+    except that they are async and await asyncio.sleep instead of time.sleep.
+    """
+
+    async def step(self) -> None:
+        """Process the next event after enough real-time has passed for the
+        event to happen.
+
+        The delay is scaled according to the real-time :attr:`factor`. With
+        :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if
+        the event is processed too slowly.
+
+        """
+        evt_time = self.peek()
+
+        if evt_time is Infinity:
+            raise EmptySchedule()
+
+        real_time = self.real_start + (evt_time - self.env_start) * self.factor
+
+        if self.strict and monotonic() - real_time > self.factor:
+            # Events scheduled for time *t* may take just up to *t+1*
+            # for their computation, before an error is raised.
+            delta = monotonic() - real_time
+            raise RuntimeError(
+                f'Simulation too slow for real time ({delta:.3f}s).'
+            )
+
+        # Sleep in a loop to fix inaccuracies of windows (see
+        # http://stackoverflow.com/a/15967564 for details) and to ignore
+        # interrupts.
+        while True:
+            delta = real_time - monotonic()
+            if delta <= 0:
+                break
+            await asyncio.sleep(delta)
+
+        Environment.step(self)
+
+    async def run(
+        self, until: Optional[Union[SimTime, Event]] = None
+    ) -> Optional[Any]:
+        """Executes :meth:`step()` until the given criterion *until* is met.
+
+        - If it is ``None`` (which is the default), this method will return
+          when there are no further events to be processed.
+
+        - If it is an :class:`~simpy.events.Event`, the method will continue
+          stepping until this event has been triggered and will return its
+          value.  Raises a :exc:`RuntimeError` if there are no further events
+          to be processed and the *until* event was not triggered.
+
+        - If it is a number, the method will continue stepping
+          until the environment's time reaches *until*.
+
+        """
+        if until is not None:
+            if not isinstance(until, Event):
+                # Assume that *until* is a number if it is not None and
+                # not an event.  Create a Timeout(until) in this case.
+                at: SimTime
+                if isinstance(until, int):
+                    at = until
+                else:
+                    at = float(until)
+
+                if at <= self.now:
+                    raise ValueError(
+                        f'until(={at}) must be > the current simulation time.'
+                    )
+
+                # Schedule the event before all regular timeouts.
+                until = Event(self)
+                until._ok = True
+                until._value = None
+                self.schedule(until, URGENT, at - self.now)
+
+            elif until.callbacks is None:
+                # Until event has already been processed.
+                return until.value
+
+            until.callbacks.append(StopSimulation.callback)
+
+        try:
+            while True:
+                await self.step()
+        except StopSimulation as exc:
+            return exc.args[0]  # == until.value
+        except EmptySchedule:
+            if until is not None:
+                assert not until.triggered
+                raise RuntimeError(
+                    f'No scheduled events left but "until" event was not '
+                    f'triggered: {until}'
+                )
+        return None

+ 49 - 0
examples/simpy/main.py

@@ -0,0 +1,49 @@
+#!/usr/bin/env python3
+import asyncio
+import datetime
+
+from async_realtime_environment import AsyncRealtimeEnvironment
+
+from nicegui import ui
+
+start_time = datetime.datetime.now()
+
+
+def clock(env):
+    while True:
+        simulation_time = start_time + datetime.timedelta(seconds=env.now)
+        clock_label.text = simulation_time.strftime('%H:%M:%S')
+        yield env.timeout(1)
+
+
+def traffic_light(env):
+    while True:
+        light.classes('bg-green-500', remove='bg-red-500')
+        yield env.timeout(30)
+        light.classes('bg-yellow-500', remove='bg-green-500')
+        yield env.timeout(5)
+        light.classes('bg-red-500', remove='bg-yellow-500')
+        yield env.timeout(20)
+
+
+async def run_simpy():
+    env = AsyncRealtimeEnvironment(factor=0.1)  # fast forward simulation with 1/10th of realtime
+    env.process(traffic_light(env))
+    env.process(clock(env))
+    try:
+        await env.run(until=300)  # run until 300 seconds of simulation time have passed
+    except asyncio.CancelledError:
+        return
+    ui.notify('Simulation completed')
+    content.classes('opacity-0')  # fade out the content
+
+# define the UI
+with ui.column().classes('absolute-center items-center transition-opacity duration-500') as content:
+    ui.label('SimPy Traffic Light Demo').classes('text-2xl mb-6')
+    light = ui.element('div').classes('w-10 h-10 rounded-full shadow-lg transition')
+    clock_label = ui.label()
+
+# start the simpy simulation as an async task in the background as soon as the UI is ready
+ui.timer(0, run_simpy, once=True)
+
+ui.run()

+ 2 - 0
examples/simpy/requirements.txt

@@ -0,0 +1,2 @@
+nicegui>=1.2
+simpy