From 24bb04ec5cdad562f9b227abc701e599575161c6 Mon Sep 17 00:00:00 2001 From: "Sam G." Date: Sun, 28 Apr 2024 14:35:04 -0700 Subject: [PATCH] fix sloppy Server event loop management, add Server tests --- execlog/listener.py | 7 +++ execlog/listeners/path.py | 24 +++++++-- execlog/router.py | 62 +++++++++++++---------- execlog/server.py | 79 +++++++++++++++++++++++++---- execlog/util/generic.py | 14 +++--- requirements.txt | 1 + tests/endpoint_proxy/fileB | 1 + tests/test_handler.py | 7 +++ tests/test_listener.py | 82 ++++++++++++++++++++++++++++++ tests/test_router.py | 58 +++++++++++++++++++++ tests/test_server.py | 100 +++++++++++++++++++++++++++++++++++++ 11 files changed, 390 insertions(+), 45 deletions(-) create mode 100644 tests/endpoint_proxy/fileB diff --git a/execlog/listener.py b/execlog/listener.py index ae8e6bd..5e990c4 100644 --- a/execlog/listener.py +++ b/execlog/listener.py @@ -32,3 +32,10 @@ class Listener[E: Event](threading.Thread): attached Router. ''' raise NotImplementedError + + def stop(self): + ''' + Begin listening for events. Typically a blocking loop that passes events to + attached Router. + ''' + raise NotImplementedError diff --git a/execlog/listeners/path.py b/execlog/listeners/path.py index f328074..752f535 100644 --- a/execlog/listeners/path.py +++ b/execlog/listeners/path.py @@ -355,12 +355,30 @@ class PathListener(Listener[FileEvent]): self.update_moved_to(path, lead) def stop(self): + ''' + Shutdown active listener processes, including the attached router thread pool and + the iNotify event loop. + + Note: + Shutting down the thread pool will wait until pending futures are finished + executing before actually returning. A common source of error is having the + main process exit before final tasks can be submitted, resulting in + RuntimeErrors that cannot "schedule new futures after interpreter shutdown." + So you either need to ensure the final tasks are scheduled before calling + `stop()` (this means more than just a `submit()` call; it must have actually + propagated through to `submit_callback` and reached `thread_pool.submit`) to + allow them to be handled automatically prior to shutdown, or manually wait on + their futures to complete. Otherwise, thread pool shutdown will occur, and + they'll still be making their way out of the queue only to reach the + `thread_pool.submit` after it's had its final boarding call. + ''' logger.info("Stopping listener...") - if self.router.thread_pool is not None: - self.router.thread_pool.shutdown() - # request INotify stop by writing in the pipe, checked in watch loop if not self.write.closed: self.write.write(b"\x00") self.write.close() + + if self.router.thread_pool is not None: + self.router.thread_pool.shutdown() + diff --git a/execlog/router.py b/execlog/router.py index 1cb22ed..35343a1 100644 --- a/execlog/router.py +++ b/execlog/router.py @@ -103,6 +103,9 @@ class Router[E: Event]: # store prepped (e.g., delayed) callbacks self.callback_registry = {} + # track event history + self.event_log = [] + self._thread_pool = None self._route_lock = threading.Lock() @@ -115,15 +118,14 @@ class Router[E: Event]: def register( self, endpoint, - callback, + callback: Callable, pattern, debounce=200, delay=10, **listener_kwargs, ): ''' - Register a route. To be defined by an inheriting class, typically taking a pattern - and a callback. + Register a route. Note: Listener arguments Notice how listener_kwargs are accumulated instead of uniquely assigned to an @@ -139,15 +141,18 @@ class Router[E: Event]: submitted event's `action` value. Parameters: - pattern: hashable object to be used when filtering event (passed to inherited - `filter(...)`) + endpoint: callback: callable accepting an event to be executed if when a matching event is received + pattern: hashable object to be used when filtering event (passed to inherited + `filter(...)`) + debounce: + delay: ''' route_tuple = (callback, pattern, debounce, delay, listener_kwargs) self.routemap[endpoint].append(route_tuple) - def submit(self, events:E | list[E], callbacks=None): + def submit(self, events:E | list[E], callbacks:list[Callable]|None=None): ''' Handle a list of events. Each event is matched against the registered callbacks, and those callbacks are ran concurrently (be it via a thread pool or an asyncio @@ -164,7 +169,7 @@ class Router[E: Event]: return futures - def submit_event(self, event, callbacks=None): + def submit_event(self, event: E, callbacks:list[Callable]|None=None): ''' Group up and submit all matching callbacks for `event`. All callbacks are ran concurrently in their own threads, and this method blocks until all are completed. @@ -173,13 +178,11 @@ class Router[E: Event]: thread, and the registered post-callbacks are attached to the completion of this function, i.e., the finishing of all callbacks matching provided event. - Note that there are no checks for empty callback lists, where we could exit early. - Here we simply rely on methods doing the right thing: `wait_on_futures` would - simply receive an empty list, for example. Nevertheless, once an event is - submitted with this method, it gets at least a few moments where that event is - considered "running," and will be later popped out by `clear_events` (almost - immediately if there is in fact nothing to do). An early exit would simply have to - come after indexing the event in `running_events` + Note that an event may not match any routes, in which case the method exits early. + An empty list is returned, and this shows up as the outer future's result. In this + case, the event is never considered "running," and the non-result picked up in + `clear_event` will ensure it exits right away (not even attempting to pop the + event from the running list, and for now not tracking it in the event log). ''' if callbacks is None: # ensure same thread gets all matching routes & sets debounce updates; else @@ -216,7 +219,7 @@ class Router[E: Event]: return future_results - def submit_callback(self, callback, *args, **kwargs): + def submit_callback(self, callback: Callable, *args, **kwargs): ''' Note: this method is expected to return a future. Perform any event-based filtering before submitting a callback with this method. @@ -311,7 +314,7 @@ class Router[E: Event]: return matches - def get_delayed_callback(self, callback, delay, index): + def get_delayed_callback(self, callback: Callable, delay: int|float, index): ''' Parameters: callback: function to wrap @@ -348,7 +351,7 @@ class Router[E: Event]: return future_results - def wait_on_callbacks(self, callbacks, event, *args, **kwargs): + def wait_on_callbacks(self, callbacks: list[Callable], event: E, *args, **kwargs): ''' Overridable by inheriting classes based on callback structure ''' @@ -357,16 +360,20 @@ class Router[E: Event]: for callback in callbacks ]) - def queue_callbacks(self, event_idx, callbacks): + def queue_callbacks(self, event_idx, callbacks: list[Callable]): ''' Overridable by inheriting classes based on callback structure ''' self.running_events[event_idx].update(callbacks) - def filter(self, event, pattern, **listen_kwargs) -> bool: + def filter(self, event: E, pattern, **listen_kwargs) -> bool: ''' + Determine if a given event matches the providedpattern + Parameters: - listen_kwargs_list: + event: + pattern: + listen_kwargs: ''' raise NotImplementedError @@ -403,7 +410,7 @@ class Router[E: Event]: event_idx = self.event_index(event) return self.running_events.pop(event_idx, None) - def clear_event(self, event, future): + def clear_event(self, event: E, future): ''' Clear an event. Pops the passed event out of `running_events`, and the request counter is >0, the event is re-submitted. @@ -419,10 +426,13 @@ class Router[E: Event]: The check for results from the passed future allows us to know when in fact a valid frame has finished, and a resubmission may be on the table. ''' - if not future.result(): return + result = future.result() + if not result: return + + self.event_log.append((event, result)) queued_callbacks = self.stop_event(event) - # resubmit event if some queued work + # resubmit event if some queued work remains if queued_callbacks and len(queued_callbacks) > 0: logger.debug( f'Event [{event.name}] resubmitted with [{len(queued_callbacks)}] queued callbacks' @@ -433,7 +443,7 @@ class Router[E: Event]: return event[:2] -class ChainRouter(Router): +class ChainRouter[E: Event](Router[E]): ''' Routes events to registered callbacks ''' @@ -460,7 +470,7 @@ class ChainRouter(Router): for endpoint, routelist in router.routemap.items(): self.routemap[endpoint].extend(routelist) - def matching_routes(self, event, event_time=None): + def matching_routes(self, event: E, event_time=None): ''' Colloquial `callbacks` now used as a dict of lists of callbacks, indexed by router, and only having keys for routers with non-empty callback lists. @@ -476,7 +486,7 @@ class ChainRouter(Router): return route_map - def wait_on_callbacks(self, callbacks, event, *args, **kwargs): + def wait_on_callbacks(self, callbacks, event: E, *args, **kwargs): ''' Note: relies on order of callbacks dict matching that of `ordered_routers`, which should happen in `matching_routes` diff --git a/execlog/server.py b/execlog/server.py index 19a9cd3..364fccc 100644 --- a/execlog/server.py +++ b/execlog/server.py @@ -17,12 +17,14 @@ import asyncio import logging import threading from functools import partial +from contextlib import asynccontextmanager import uvicorn from inotify_simple import flags from fastapi import FastAPI, WebSocket from fastapi.staticfiles import StaticFiles +from execlog.routers.path import PathRouter from execlog.handler import Handler as LREndpoint @@ -65,6 +67,7 @@ class Server: self.managed_listeners = managed_listeners self.listener = None + self.userver = None self.server = None self.server_text = '' self.server_args = {} @@ -104,9 +107,14 @@ class Server: self.server_args['host'] = self.host self.server_args['port'] = self.port + @asynccontextmanager + async def lifespan(app: FastAPI): + yield + self.shutdown() + if self.static or self.livereload: - self.server = FastAPI() - self.server.on_event('shutdown')(self.shutdown) + self.server = FastAPI(lifespan=lifespan) + #self.server.on_event('shutdown')(self.shutdown) if self.livereload: self._wrap_livereload() @@ -121,8 +129,6 @@ class Server: ''' flags.MODIFY okay since we don't need to reload non-existent pages ''' - from execlog.reloader.router import PathRouter - if self.loop is None: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) @@ -195,6 +201,17 @@ class Server: explicitly in order for things to be handled gracefully. This is done in the server setup step, where we ensure FastAPI calls `watcher.stop()` during its shutdown process. + + Note: on event loop management + The uvicorn server is ran with `run_until_complete`, intended as a + long-running process to eventually be interrupted or manually disrupted with a + call to `shutdown()`. The `shutdown` call attempts to gracefully shutdown the + uvicorn process by setting a `should_exit` flag. Upon successful shutdown, the + server task will be considered complete, and we can then manually close the + loop following the interruption. So a shutdown call (which is also attached as + a lifespan shutdown callback for the FastAPI object) will disable listeners + and shut down their thread pools, gracefully close up the Uvicorn server and + allow the serve coroutine to complete, and finally close down the event loop. ''' if self.loop is None: self.loop = asyncio.new_event_loop() @@ -209,17 +226,61 @@ class Server: logger.info(f'Server{self.server_text} @ http://{self.host}:{self.port}') uconfig = uvicorn.Config(app=self.server, loop=self.loop, **self.server_args) - userver = uvicorn.Server(config=uconfig) - self.loop.run_until_complete(userver.serve()) + self.userver = uvicorn.Server(config=uconfig) + self.loop.run_until_complete(self.userver.serve()) + self.loop.close() def shutdown(self): ''' Additional shutdown handling after the FastAPI event loop receives an interrupt. - Currently this + This is attached as a "shutdown" callback when creating the FastAPI instance, + which generally appears to hear interrupts and propagate them through. + + This method can also be invoked programmatically, such as from a thread not + handling the main event loop. Note that either of the following shutdown + approaches of the Uvicorn server do not appear to work well in this case; they + both stall the calling thread indefinitely (in the second case, when waiting on + the shutdown result), or simply don't shutdown the server (in the first). Only + setting `should_exit` and allowing for a graceful internal shutdown appears to + both 1) handle this gracefully, and 2) shut down the server at all. + + ``` + self.loop.call_soon_threadsafe(self.userver.shutdown) + + # OR # + + future = asyncio.run_coroutine_threadsafe(self.userver.shutdown(), self.loop) + + # and wait for shutdown + future.result() + ``` + + The shutdown process goes as follows: + + 1. Stop any managed listeners: close out listener loops and/or thread pools by + calling `stop()` on each of the managed listeners. This prioritizes their + closure so that no events can make their way into the queue. + 2. Gracefully shut down the wrapper Uvicorn server. This is the process that + starts the FastAPI server instance; set the `should_exit` flag. + + If this completes successfully, in the thread where Uvicorn was started the server + task should be considered "completed," at which point the event loop can be closed + successfully. ''' logger.info("Shutting down server...") # stop attached auxiliary listeners, both internal & external - for listener in self.managed_listeners: - listener.stop() + if self.managed_listeners: + logger.info(f"Stopping {len(self.managed_listeners)} listeners...") + + for listener in self.managed_listeners: + listener.stop() + + # stop FastAPI server if started + if self.userver is not None: + def set_should_exit(): + self.userver.should_exit = True + + self.loop.call_soon_threadsafe(set_should_exit) + diff --git a/execlog/util/generic.py b/execlog/util/generic.py index b307c0a..bf47862 100644 --- a/execlog/util/generic.py +++ b/execlog/util/generic.py @@ -10,13 +10,13 @@ class ColorFormatter(logging.Formatter): colorama.init(autoreset=True) FORMATS = { - 'x': Fore.YELLOW + _format, - 'listener': Fore.GREEN + _format, - 'handler': Fore.CYAN + _format, - 'server': Style.DIM + Fore.CYAN + _format, - 'router': Fore.MAGENTA + _format, - 'site': Fore.BLUE + _format, - 'utils': Style.DIM + Fore.WHITE + _format, + 'x': Fore.YELLOW + _format, + 'listener': Fore.GREEN + _format, + 'handler': Fore.CYAN + _format, + 'server': Style.DIM + Fore.CYAN + _format, + 'router': Fore.MAGENTA + _format, + 'site': Fore.BLUE + _format, + 'utils': Style.DIM + Fore.WHITE + _format, } FORMATS = { k:logging.Formatter(v) for k,v in FORMATS.items() } DEFAULT_LOGGER = logging.Formatter(_format) diff --git a/requirements.txt b/requirements.txt index 01bae86..6b5503b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ uvicorn inotify_simple tqdm wcmatch +websockets # -- logging -- colorama diff --git a/tests/endpoint_proxy/fileB b/tests/endpoint_proxy/fileB new file mode 100644 index 0000000..276e789 --- /dev/null +++ b/tests/endpoint_proxy/fileB @@ -0,0 +1 @@ +test text \ No newline at end of file diff --git a/tests/test_handler.py b/tests/test_handler.py index e69de29..0595b4f 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -0,0 +1,7 @@ +# from websockets.sync.client import connect +# +# def hello(): +# with connect("ws://localhost:8765") as websocket: +# websocket.send("Hello world!") +# message = websocket.recv() +# print(f"Received: {message}") diff --git a/tests/test_listener.py b/tests/test_listener.py index e69de29..ad5217f 100644 --- a/tests/test_listener.py +++ b/tests/test_listener.py @@ -0,0 +1,82 @@ +import time +import logging +from pathlib import Path +from functools import partial + +from execlog import util +from execlog import ChainRouter, Event +from execlog.routers import PathRouter +from execlog.listeners import PathListener + + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) +logger.addHandler(util.generic.TqdmLoggingHandler()) + +# router setup +router1 = PathRouter() +router2 = PathRouter() +router3 = PathRouter() + +chain_router = ChainRouter([router1, router2, router3]) + +# router-1 +router1.register('tests/endpoint_proxy', partial(print, 'R1-1 ::')) + +# router-2 +router2.register('tests/endpoint_proxy', partial(print, 'R2-1 ::')) +router2.register('tests/endpoint_proxy', partial(print, 'R2-2 ::')) + +# router-3 +router3.register('tests/endpoint_proxy', partial(print, 'R3-1 ::')) +router3.register('tests/endpoint_proxy', partial(print, 'R3-2 ::')) +router3.register('tests/endpoint_proxy', partial(print, 'R3-3 ::')) + + +def test_single_path_listener(): + ''' + 1. Get listener for a single router + 2. Start listening for file events + 3. Create a few files under the registered path + 4. Wait a second for inotify to pick up on the events, allow jobs to be submitted to + the router's thread pool + 5. Shutdown the listener; any lingering jobs will be finished if not done already + ''' + listener = router1.get_listener() + + # listener starts in new thread + listener.start() + + file_a = Path('tests/endpoint_proxy/fileA') + file_a.write_text('test text') + file_a.unlink() + + file_b = Path('tests/endpoint_proxy/fileB') + file_b.write_text('test text') + + # allow I/O to propagate + time.sleep(1) + + listener.stop() + + assert True + +def test_chain_path_listener(): + listener = chain_router.get_listener() + + # listener starts in new thread + listener.start() + + file_a = Path('tests/endpoint_proxy/fileA') + file_a.write_text('test text') + file_a.unlink() + + file_b = Path('tests/endpoint_proxy/fileB') + file_b.write_text('test text') + + # allow I/O to propagate + time.sleep(1) + + listener.stop() + + assert True diff --git a/tests/test_router.py b/tests/test_router.py index e69de29..30926c0 100644 --- a/tests/test_router.py +++ b/tests/test_router.py @@ -0,0 +1,58 @@ +import logging +from pathlib import Path +from functools import partial + +from execlog import util +from execlog import ChainRouter, Event +from execlog.routers import PathRouter +from execlog.listeners import PathListener + + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) +logger.addHandler(util.generic.TqdmLoggingHandler()) + +# router setup +router1 = PathRouter() +router2 = PathRouter() +router3 = PathRouter() + +chain_router = ChainRouter([router1, router2, router3]) + +def test_route_registry(): + # router-1 + router1.register('endpoint_proxy', partial(print, 'R1-1 ::')) + + # router-2 + router2.register('endpoint_proxy', partial(print, 'R2-1 ::')) + router2.register('endpoint_proxy', partial(print, 'R2-2 ::')) + + # router-3 + router3.register('endpoint_proxy', partial(print, 'R3-1 ::')) + router3.register('endpoint_proxy', partial(print, 'R3-2 ::')) + router3.register('endpoint_proxy', partial(print, 'R3-3 ::')) + + assert True + +def test_single_router_submission(): + events = [ + Event(endpoint='endpoint_proxy', name='file1'), + Event(endpoint='endpoint_proxy', name='file2'), + Event(endpoint='endpoint_proxy', name='file3'), + ] + futures = router2.submit(events) + router2.wait_on_futures(futures) + + assert True + +def test_chain_router_submission(): + events = [ + Event(endpoint='endpoint_proxy', name='file1'), + Event(endpoint='endpoint_proxy', name='file2'), + Event(endpoint='endpoint_proxy', name='file3'), + ] + futures = chain_router.submit(events) + chain_router.wait_on_futures(futures) + + assert True + diff --git a/tests/test_server.py b/tests/test_server.py index e69de29..4137def 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -0,0 +1,100 @@ +import time +import multiprocessing as mp +import threading +import logging +from pathlib import Path + +from execlog import Server +from execlog.routers import PathRouter + + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + +def threaded_start_then_join(server): + thread = threading.Thread(target=server.start) + + # start the server; is a blocking call in that thread + thread.start() + + # short wait here in main thread for some startup procedures + time.sleep(1) + + # call shutdown from this thread + server.shutdown() + + # join the thread back to main thread; if successfully started but shutdown failed, + # joining back would cause indefinite blockage + thread.join() + + # doesn't appear to be a more formal way to check if server is officially running; + # done a lot of digging here. No flags, state; I imagine it's actually difficult to + # know if the process is actually stopped. In any case, the above logic is good enough + # for my use case as far as I'm concerned. + return True + + +def test_server_creation(): + server = Server( + host='localhost', + port=8778, + root='.' + ) + + assert threaded_start_then_join(server) + +def test_server_static(): + server = Server( + host='localhost', + port=8778, + root='.', + static=True + ) + + assert threaded_start_then_join(server) + +def test_server_livereload(): + server = Server( + host='localhost', + port=8778, + root='.', + livereload=True, + ) + + assert threaded_start_then_join(server) + +def test_server_with_listeners(): + router1 = PathRouter() + router1.register('tests/endpoint_proxy', lambda _: 'router1 job success') + + router2 = PathRouter() + router2.register('tests/endpoint_proxy', lambda _: 'router2 job success') + + listeners = [router1.get_listener(), router2.get_listener()] + + server = Server( + host='localhost', + port=8778, + root='.', + managed_listeners=listeners, + ) + thread = threading.Thread(target=server.start) + thread.start() + + # write a file to a registered endpoint + file_a = Path('tests/endpoint_proxy/server_file') + file_a.write_text('test text') + file_a.unlink() + + # wait a sec + time.sleep(2) + + # attempt to shutdown the server, join the thread back + # successful if not blocking + server.shutdown() + thread.join() + + # finally check the router event logs + assert router1.event_log[0][1] == ['router1 job success'] + assert router2.event_log[0][1] == ['router2 job success'] +