Forráskód Böngészése

use new run module where possible

Falko Schindler 1 éve
szülő
commit
902f8e5c2e

+ 3 - 11
examples/ai_interface/main.py

@@ -1,25 +1,17 @@
 #!/usr/bin/env python3
-import asyncio
-import functools
 import io
-from typing import Callable
 
 import replicate  # very nice API to run AI models; see https://replicate.com/
 
-from nicegui import ui
+from nicegui import run, ui
 from nicegui.events import UploadEventArguments
 
 
-async def io_bound(callback: Callable, *args: any, **kwargs: any):
-    '''Makes a blocking function awaitable; pass function as first parameter and its arguments as the rest'''
-    return await asyncio.get_event_loop().run_in_executor(None, functools.partial(callback, *args, **kwargs))
-
-
 async def transcribe(e: UploadEventArguments):
     transcription.text = 'Transcribing...'
     model = replicate.models.get('openai/whisper')
     version = model.versions.get('30414ee7c4fffc37e260fcab7842b5be470b9b840f2b608f5baa9bbef9a259ed')
-    prediction = await io_bound(version.predict, audio=io.BytesIO(e.content.read()))
+    prediction = await run.io_bound(version.predict, audio=io.BytesIO(e.content.read()))
     text = prediction.get('transcription', 'no transcription')
     transcription.set_text(f'result: "{text}"')
 
@@ -28,7 +20,7 @@ async def generate_image():
     image.source = 'https://dummyimage.com/600x400/ccc/000000.png&text=building+image...'
     model = replicate.models.get('stability-ai/stable-diffusion')
     version = model.versions.get('db21e45d3f7023abc2a46ee38a23973f6dce16bb082a930b0c49861f96d1e5bf')
-    prediction = await io_bound(version.predict, prompt=prompt.value)
+    prediction = await run.io_bound(version.predict, prompt=prompt.value)
     image.source = prediction[0]
 
 # User Interface

+ 3 - 10
examples/opencv_webcam/main.py

@@ -1,7 +1,5 @@
 #!/usr/bin/env python3
-import asyncio
 import base64
-import concurrent.futures
 import signal
 import time
 
@@ -10,10 +8,8 @@ import numpy as np
 from fastapi import Response
 
 import nicegui.globals
-from nicegui import app, ui
+from nicegui import app, run, ui
 
-# We need an executor to schedule CPU-intensive tasks with `loop.run_in_executor()`.
-process_pool_executor = concurrent.futures.ProcessPoolExecutor()
 # In case you don't have a webcam, this will provide a black placeholder image.
 black_1px = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAA1JREFUGFdjYGBg+A8AAQQBAHAgZQsAAAAASUVORK5CYII='
 placeholder = Response(content=base64.b64decode(black_1px.encode('ascii')), media_type='image/png')
@@ -31,14 +27,13 @@ def convert(frame: np.ndarray) -> bytes:
 async def grab_video_frame() -> Response:
     if not video_capture.isOpened():
         return placeholder
-    loop = asyncio.get_running_loop()
     # The `video_capture.read` call is a blocking function.
     # So we run it in a separate thread (default executor) to avoid blocking the event loop.
-    _, frame = await loop.run_in_executor(None, video_capture.read)
+    _, frame = await run.io_bound(video_capture.read)
     if frame is None:
         return placeholder
     # `convert` is a CPU-intensive function, so we run it in a separate process to avoid blocking the event loop and GIL.
-    jpeg = await loop.run_in_executor(process_pool_executor, convert, frame)
+    jpeg = await run.cpu_bound(convert, frame)
     return Response(content=jpeg, media_type='image/jpeg')
 
 # For non-flickering image updates an interactive image is much better than `ui.image()`.
@@ -68,8 +63,6 @@ async def cleanup() -> None:
     await disconnect()
     # Release the webcam hardware so it can be used by other applications again.
     video_capture.release()
-    # The process pool executor must be shutdown when the app is closed, otherwise the process will not exit.
-    process_pool_executor.shutdown()
 
 app.on_shutdown(cleanup)
 # We also need to disconnect clients when the app is stopped with Ctrl+C,

+ 3 - 12
examples/progress/main.py

@@ -1,16 +1,12 @@
 #!/usr/bin/env python3
-import asyncio
 import time
-from concurrent.futures import ProcessPoolExecutor
 from multiprocessing import Manager, Queue
 
-from nicegui import app, ui
-
-pool = ProcessPoolExecutor()
+from nicegui import run, ui
 
 
 def heavy_computation(q: Queue) -> str:
-    '''Some heavy computation that updates the progress bar through the queue.'''
+    """Run some heavy computation that updates the progress bar through the queue."""
     n = 50
     for i in range(n):
         # Perform some heavy computation
@@ -23,11 +19,9 @@ def heavy_computation(q: Queue) -> str:
 
 @ui.page('/')
 def main_page():
-
     async def start_computation():
         progressbar.visible = True
-        loop = asyncio.get_running_loop()
-        result = await loop.run_in_executor(pool, heavy_computation, queue)
+        result = await run.cpu_bound(heavy_computation, queue)
         ui.notify(result)
         progressbar.visible = False
 
@@ -42,7 +36,4 @@ def main_page():
     progressbar.visible = False
 
 
-# stop the pool when the app is closed; will not cancel any running tasks
-app.on_shutdown(pool.shutdown)
-
 ui.run()

+ 2 - 2
nicegui/native.py

@@ -1,4 +1,3 @@
-import asyncio
 import inspect
 import warnings
 from dataclasses import dataclass, field
@@ -8,6 +7,7 @@ from typing import Any, Callable, Dict, Optional, Tuple
 
 from .dataclasses import KWONLY_SLOTS
 from .globals import log
+from .run_executor import io_bound
 
 method_queue: Queue = Queue()
 response_queue: Queue = Queue()
@@ -123,7 +123,7 @@ try:
                     log.exception(f'error in {name}')
                     return None
             name = inspect.currentframe().f_back.f_code.co_name  # type: ignore
-            return await asyncio.get_event_loop().run_in_executor(None, partial(wrapper, *args, **kwargs))
+            return await io_bound(wrapper, *args, **kwargs)
 
         def signal_server_shutdown(self) -> None:
             self._send()

+ 2 - 3
nicegui/welcome.py

@@ -1,9 +1,9 @@
-import asyncio
 import os
 import socket
 from typing import List
 
 from . import globals  # pylint: disable=redefined-builtin
+from .run_executor import io_bound
 
 try:
     import netifaces
@@ -33,8 +33,7 @@ async def print_message() -> None:
     print('NiceGUI ready to go ', end='', flush=True)
     host = os.environ['NICEGUI_HOST']
     port = os.environ['NICEGUI_PORT']
-    loop = asyncio.get_running_loop()
-    ips = set((await loop.run_in_executor(None, get_all_ips)) if host == '0.0.0.0' else [])
+    ips = set((await io_bound(get_all_ips)) if host == '0.0.0.0' else [])
     ips.discard('127.0.0.1')
     urls = [(f'http://{ip}:{port}' if port != '80' else f'http://{ip}') for ip in ['localhost'] + sorted(ips)]
     globals.app.urls.update(urls)