Compare commits

..

No commits in common. "3b6a09bb72637a46f126b1cbcdee64494ffc674e" and "eae2058c2d2d67f255b3d544f5d34b7a66a6ff93" have entirely different histories.

14 changed files with 207 additions and 479 deletions

View File

@ -15,22 +15,13 @@ author = 'Sam Griesemer'
extensions = [ extensions = [
"sphinx.ext.autodoc", "sphinx.ext.autodoc",
"sphinx.ext.autosummary", # enables a directive to be specified manually that gathers "sphinx.ext.autosummary",
# module/object summary details in a table "sphinx.ext.viewcode",
"sphinx.ext.viewcode", # allow viewing source in the HTML pages "myst_parser",
"myst_parser", # only really applies to manual docs; docstrings still need RST-like
"sphinx.ext.napoleon", # enables Google-style docstring formats
"sphinx_autodoc_typehints", # external extension that allows arg types to be inferred by type hints
] ]
autosummary_generate = True autosummary_generate = True
autosummary_imported_members = True autosummary_imported_members = True
# include __init__ definitions in autodoc
autodoc_default_options = {
'special-members': '__init__',
}
#smartquotes = True
templates_path = ['_templates'] templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']

View File

@ -3,23 +3,19 @@
{ref}`modindex` {ref}`modindex`
{ref}`search` {ref}`search`
## Top-level module overview
```{eval-rst} ```{eval-rst}
.. autosummary:: .. autosummary::
:nosignatures: :nosignatures:
:recursive:
execlog.Handler execlog.Handler
execlog.Listener execlog.Listener
execlog.Router execlog.Router
execlog.Server execlog.Server
execlog.listeners
``` ```
## Auto-reference contents
```{toctree} ```{toctree}
:maxdepth: 3 :maxdepth: 3
:caption: Autoref
_autoref/execlog.rst _autoref/execlog.rst
``` ```

View File

@ -105,42 +105,7 @@ pages:
**Reference directives** **Reference directives**
## Notes on docstring syntax
- Code literals need to be surrounded in two backticks, e.g., "``variable``". Sphinx will
also complain if you make the reference plural by having an "s" after the backtick; it
needs to go on the inside.
- MyST parsing, even if enabled, doesn't apply to docstrings. You need to use RST
generally, with a few directives being different under extensions like `napoleon`.
- Code blocks and admonitions need a space between the heading and the rest of the
content.
Nice syntax cheatsheet [here][4]
General docstring structure should be structured as follows:
```python
def example_function(a, b):
'''
Minimal function description. (either first sentence or line; gets used in
autosummaries)
Additional exposition, unwrapped by admonitions.
.. admonition:: Admonition description
Indented content, code blocks, lists, etc
Parameters:
a: a's description
b: b's description
Returns:
<return-type>: Description of return value
'''
...
```
[1]: https://pradyunsg.me/furo/ [1]: https://pradyunsg.me/furo/
[2]: https://www.sphinx-doc.org/en/master/man/sphinx-apidoc.html [2]: https://www.sphinx-doc.org/en/master/man/sphinx-apidoc.html
[3]: https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html# [3]: https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#
[4]: https://sphinx-tutorial.readthedocs.io/cheatsheet/

View File

@ -1,9 +1,9 @@
from execlog import util
from execlog import routers
from execlog import listeners
from execlog.server import Server
from execlog.handler import Handler from execlog.handler import Handler
from execlog.listener import Listener from execlog.listener import Listener
from execlog.router import Router, ChainRouter, Event
from execlog.server import Server
from execlog.event import Event, FileEvent from execlog.event import Event, FileEvent
from execlog.router import Router, ChainRouter, Event, RouterBuilder, route
from execlog import listeners
from execlog import routers
from execlog import util

View File

@ -11,5 +11,4 @@ FileEvent = namedtuple(
'FileEvent', 'FileEvent',
['endpoint', 'name', 'action'], ['endpoint', 'name', 'action'],
defaults=[None, None, None], defaults=[None, None, None],
# action is 32bit flag mask
) )

View File

@ -34,25 +34,24 @@ class Handler(WebSocketEndpoint):
''' '''
Subclasses WebSocketEndpoint to be attached to live reload endpoints. Subclasses WebSocketEndpoint to be attached to live reload endpoints.
.. admonition:: Reload model Note: Reload model
- Served HTML files are generated from templates that include livereload JS and the - Served HTML files are generated from templates that include livereload JS and the
target livereload server (port manually set prior to site build). target livereload server (port manually set prior to site build).
- When pages are visited (be they served from NGINX or via the development - When pages are visited (be they served from NGINX or via the development
server), the livereload.js attempts to connect to the known livereload WS server), the livereload.js attempts to connect to the known livereload WS
endpoint. endpoint.
- FastAPI routes the request to _this_ endpoint, and ``on_connect`` is called. - FastAPI routes the request to _this_ endpoint, and `on_connect` is called.
- Upon successful connection, the livereload JS client sends a "hello" message. - Upon successful connection, the livereload JS client sends a "hello" message.
This is picked up as the first post-acceptance message, and captured by the This is picked up as the first post-acceptance message, and captured by the
``on_receive`` method. `on_receive` method.
- ``on_receive`` subsequently initiates a formal handshake, sending back a "hello" - `on_receive` subsequently initiates a formal handshake, sending back a "hello"
command and waiting the "info" command from the client. command and waiting the "info" command from the client.
- If the "info" command is received successfully and contains the requesting - If the "info" command is received successfully and contains the requesting
page's URL, the handshake completes and the websocket is added to the class' page's URL, the handshake completes and the websocket is added to the class'
``live_clients`` tracker. `live_clients` tracker.
- Later, when a file in a watch path of the server's watcher is _modified_, - Later, when a file in a watch path of the server's watcher is _modified_,
``reload_clients`` will be called from within the originating server's event loop, `reload_clients` will be called from within the originating server's event loop,
and pass in the FS event associated with the file change. ``client_reload_wrap`` and pass in the FS event associated with the file change. `client_reload_wrap`
is used to wrap a boolean checker method for whether or not to reload clients is used to wrap a boolean checker method for whether or not to reload clients
given the FS event. given the FS event.
@ -67,11 +66,10 @@ class Handler(WebSocketEndpoint):
async def on_receive(self, websocket, data): async def on_receive(self, websocket, data):
''' '''
.. admonition:: On page names Note: On page names
When websockets connect, they simply communicate the exact URL from the origin When websockets connect, they simply communicate the exact URL from the origin
page. The websocket is then indexed to possibly variable page names (often page. The websocket is then indexed to possibly variable page names (often
without an ``.html`` suffix, but occasionally with). The ``client_reload_wrap`` is without an `.html` suffix, but occasionally with). The `client_reload_wrap` is
then responsible for taking this client page name and normalizing it to be then responsible for taking this client page name and normalizing it to be
matched with full file names (i.e., suffix always included). matched with full file names (i.e., suffix always included).
''' '''

View File

@ -1,4 +1,5 @@
''' '''
Implements a file system watcher.
See also: See also:
@ -10,9 +11,6 @@ from execlog.event import Event
class Listener[E: Event](threading.Thread): class Listener[E: Event](threading.Thread):
'''
Implements a file system watcher.
'''
def __init__(self, router: 'Router[E]'): def __init__(self, router: 'Router[E]'):
''' '''
Parameters: Parameters:

View File

@ -1,5 +1 @@
'''
Thing
'''
from execlog.listeners.path import PathListener from execlog.listeners.path import PathListener

View File

@ -5,11 +5,9 @@ import logging
from pathlib import Path from pathlib import Path
from collections import defaultdict from collections import defaultdict
from colorama import Fore, Back, Style
from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks
from execlog import util from execlog import util
from execlog.util.generic import color_text
from execlog.event import FileEvent from execlog.event import FileEvent
from execlog.listener import Listener from execlog.listener import Listener
@ -25,7 +23,7 @@ class PathListener(Listener[FileEvent]):
Note: Note:
Due to the nature of INotify, you cannot watch the same path with two Due to the nature of INotify, you cannot watch the same path with two
separate flag settings (without creating a new INotify instance). Under the separate flag settings (without creating a new INotify instance). Under the
same instance, calling ``add_watch`` for an already watched path location will same instance, calling `add_watch` for an already watched path location will
simply return the watch descriptor already associated with that location (and simply return the watch descriptor already associated with that location (and
may update the flags to whatever is passed). However, this location will only may update the flags to whatever is passed). However, this location will only
ever be "watched once" by a given INotify instance, so keep this in mind if ever be "watched once" by a given INotify instance, so keep this in mind if
@ -77,8 +75,8 @@ class PathListener(Listener[FileEvent]):
remove=False, remove=False,
): ):
''' '''
Recursively watch directories under path/lead, using ``path`` as the registered Recursively watch directories under path/lead, using `path` as the registered
base. Specifying ``lead`` gives one control over the subdirectory on which to base. Specifying `lead` gives one control over the subdirectory on which to
recurse; the "origin" the recursion base point. recurse; the "origin" the recursion base point.
Note: on renamed/moved directories Note: on renamed/moved directories
@ -90,9 +88,9 @@ class PathListener(Listener[FileEvent]):
the router. Explicitly re-watching the renamed directory (and any the router. Explicitly re-watching the renamed directory (and any
subdirectories) will also return that existing watch descriptor. Thus, this subdirectories) will also return that existing watch descriptor. Thus, this
method can just be directly called for directory moves/renames, and WDs in the method can just be directly called for directory moves/renames, and WDs in the
``watchmap`` will just be used as expected. (One should also call the method `watchmap` will just be used as expected. (One should also call the method
using the old lead and set ``remove=True`` to remove old tuples out of the using the old lead and set `remove=True` to remove old tuples out of the
``watchmap``. Note that this doesn't remove the existing watches from iNotify, `watchmap`. Note that this doesn't remove the existing watches from iNotify,
just their tracked tuples.) just their tracked tuples.)
''' '''
if lead is None: if lead is None:
@ -113,16 +111,15 @@ class PathListener(Listener[FileEvent]):
flags=None, flags=None,
): ):
''' '''
Listen to file events occurring under a provided path, optionally excluding those Listen to file events occurring under a provided path, optionally only events
not matching the provided iNotify flags. matching provided iNotify flags.
Parameters: Parameters:
path: Path (directory) to watch with ``inotify`` path: Path (directory) to watch with `inotify`
flags: inotify_simple flags matching FS event types allowed to trigger the flags: inotify_simple flags matching FS event types allowed to trigger the
callback callback
''' '''
#path = Path(path) path = Path(path)
path = str(path)
if flags is None: if flags is None:
flags = iflags.CREATE | iflags.DELETE | iflags.MODIFY | iflags.DELETE_SELF | iflags.MOVED_TO flags = iflags.CREATE | iflags.DELETE | iflags.MODIFY | iflags.DELETE_SELF | iflags.MOVED_TO
@ -140,31 +137,19 @@ class PathListener(Listener[FileEvent]):
def run(self): def run(self):
''' '''
Start the (blocking) iNotify event loop
Note: On usage Note: On usage
``start()`` is a blocking call. This will hog your main thread if not properly `start()` is a blocking call. This will hog your main thread if not properly
threaded. If handling this manually in your outer context, you will also need threaded. If handling this manually in your outer context, you will also need
to make sure to call ``.stop()`` to make sure to call `.stop()`
''' '''
self.started = True self.started = True
logger.info( logger.info(f'Starting listener for {len(self.watchmap)} paths')
color_text(
f'Starting listener for {len(self.watchmap)} paths',
Fore.GREEN,
)
)
for path, flags in self.canonmap.values(): for path, flags in self.canonmap.values():
logger.info(f'> Listening on path {path} for flags {iflags.from_mask(flags)}') logger.info(f'> Listening on path {path} for flags {iflags.from_mask(flags)}')
for (callback, pattern, debounce, delay, *_) in self.router.routemap[path]: for (callback, pattern, debounce, delay, *_) in self.router.routemap[path]:
logger.info( logger.info(f'| > {pattern} -> {callback.__name__} (debounce {debounce}ms, delay {delay}ms)')
color_text(
f'| > {pattern} -> {callback.__name__} (debounce {debounce}ms, delay {delay}ms)',
Style.DIM,
)
)
while True: while True:
rlist, _, _ = select.select( rlist, _, _ = select.select(
@ -183,30 +168,26 @@ class PathListener(Listener[FileEvent]):
def update_moved_from(self, path, lead): def update_moved_from(self, path, lead):
''' '''
Update directories on ``MOVED_FROM`` events. Update directories on `MOVED_FROM` events. This method gets the existing WD,
removes the old path associated with that WD from the `watchmap` (preventing
events originating from this old path when the new path, which has the _same WD_,
receives an inotify event), and queues the (WD, base-path) tuple to be matched
later in a `MOVED_TO` handler.
.. admonition:: Additional details This method isn't a part of a `MOVED_TO` handler because it may be called without
ever having a `MOVED_TO` that follows up. We respond right away in `handle_events`
to `MOVED_FROM` events, keeping the `watchmap` in sync, regardless of whether we
can expect a `MOVED_TO` to sweep through after the fact.
This method gets the existing WD, removes the old path associated with that WD Note that the `lead` is unique for a given WD and base path. WDs are unique for
from the ``watchmap`` (preventing events originating from this old path when the
new path, which has the *same WD*, receives an inotify event), and queues the (WD,
base path) tuple to be matched later in a ``MOVED_TO`` handler.
This method isn't a part of a ``MOVED_TO`` handler because it may be called
without ever having a ``MOVED_TO`` that follows up. We respond right away in
``handle_events`` to ``MOVED_FROM`` events, keeping the ``watchmap`` in sync,
regardless of whether we can expect a ``MOVED_TO`` to sweep through after the
fact.
Note that the ``lead`` is unique for a given WD and base path. WDs are unique for
filepaths, but inotify uses the same WD for new directories when they experience a filepaths, but inotify uses the same WD for new directories when they experience a
rename (it's the same inode). However, during such a transition, the ``watchmap`` rename (it's the same inode). However, during such a transition, the `watchmap`
can see two different entries for the same WD and basepath: the old tracked path, can see two different entries for the same WD and basepath: the old tracked path,
and the newly named one (again, still the same inode). So: this method can be and the newly named one (again, still the same inode). So: this method can be
called 1) directly from ``MOVED_FROM`` events, preemptively wiping the old path called 1) directly from `MOVED_FROM` events, preemptively wiping the old path from
from the tracked dicts, or 2) during handling of a ``MOVED_TO`` event (in case we the tracked dicts, or 2) during handling of a `MOVED_TO` event (in case we don't
don't allow ``MOVED_FROM`` events, for instance), given both the new and old paths allow `MOVED_FROM` events, for instance), given both the new and old paths can be
can be seen in the ``watchmap``. seen in the `watchmap`.
''' '''
wd = self.pathmap.get(Path(path, lead)) wd = self.pathmap.get(Path(path, lead))
logger.debug(f'> MOVED_FROM update, [{Path(path, lead)}] in pathmap as [{wd}]') logger.debug(f'> MOVED_FROM update, [{Path(path, lead)}] in pathmap as [{wd}]')
@ -221,47 +202,46 @@ class PathListener(Listener[FileEvent]):
Construct synthetic MOVED events. Events are constructed from the path's WD. If Construct synthetic MOVED events. Events are constructed from the path's WD. If
the provided path is not watched, an empty list of events is returned. the provided path is not watched, an empty list of events is returned.
.. admonition:: Design details Note: Design details
This method is nuanced. It can only be called once a `MOVED_TO` occurs, since
This method is nuanced. It can only be called once a ``MOVED_TO`` occurs, since we can't act on a `MOVED_FROM` (we don't know the new target location to look
we can't act on a ``MOVED_FROM`` (we don't know the new target location to look
so we can send file events). When called, we first look for the path's WD in so we can send file events). When called, we first look for the path's WD in
the ``pathmap``. We then check if this WD points to more than one entry with the the `pathmap`. We then check if this WD points to more than one entry with the
same base path (WDs are unique to the path; under the same WD, the same base same base path (WDs are unique to the path; under the same WD, the same base
path implies the same lead). If so, we know one is the outdated path, and we path implies the same lead). If so, we know one is the outdated path, and we
push the outdated lead to ``update_moved_from``. This would be evidence that the push the outdated lead to `update_moved_from`. This would be evidence that the
``MOVED_FROM`` event for the move operation wasn't handled in the main event `MOVED_FROM` event for the move operation wasn't handled in the main event
handling loop. We then check for unmatched move-froms, which should provide handling loop. We then check for unmatched move-froms, which should provide
any renamed directories, regardless of whether ``MOVED_FROMs`` were allowed, to any renamed directories, regardless of whether `MOVED_FROM`s were allowed, to
be detected. Finally, the appropriate ``MOVED_FROMs`` and ``MOVED_TOs`` are be detected. Finally, the appropriate `MOVED_FROM`s and `MOVED_TO`s are
handled. To ensure only the correct events match upon handling, we do the handled. To ensure only the correct events match upon handling, we do the
following: following:
- First, if a ``MOVED_FROM`` path is not available, we assume it wasn't queued - First, if a `MOVED_FROM` path is not available, we assume it wasn't queued
by the event and not a watched flag. Given we by default ensure MOVED events by the event and not a watched flag. Given we by default ensure MOVED events
are tracked, regardless of listened paths, this shouldn't be possible, but are tracked, regardless of listened paths, this shouldn't be possible, but
if this standard were to change, we won't recursively respond to if this standard were to change, we won't recursively respond to
``MOVED_FROMs``. This will mean that we can't prevent events from being `MOVED_FROM`s. This will mean that we can't prevent events from being
matched to old directory names (we've rooted out the ability to tell when matched to old directory names (we've rooted out the ability to tell when
they've changed), and thus can't remove them from the ``watchpath`` they've changed), and thus can't remove them from the `watchpath`
accordingly. (One functional caveat here: this MOVED_TO handling method accordingly. (One functional caveat here: this MOVED_TO handling method
explicitly calls ``updated_moved_from``, which should clean up lingering explicitly calls `updated_moved_from`, which should clean up lingering
renamed path targets. This happens recursively if we're watching MOVED_TOs, renamed path targets. This happens recursively if we're watching MOVED_TOs,
so even if standards do change and you don't watch ``MOVED_FROMs``, you'll so even if standards do change and you don't watch `MOVED_FROM`s, you'll
still get clean up for free due to the robustness of this method. still get clean up for free due to the robustness of this method.
- If a ``MOVED_FROM`` lead is found, either due to an inferred matching base - If a `MOVED_FROM` lead is found, either due to an inferred matching base
lingering in the ``watchmap`` or through previously handled ``MOVED_FROM`` lingering in the `watchmap` or through previously handled `MOVED_FROM`
response, add this path/lead back to the ``watchmap``, remove the new response, add this path/lead back to the `watchmap`, remove the new
path/lead, and call ``handle_events`` for the synthetic ``MOVED_FROM`` events path/lead, and call `handle_events` for the synthetic `MOVED_FROM` events
across files and directories. Once finished, again remove the old path/lead across files and directories. Once finished, again remove the old path/lead
and add back the new one. and add back the new one.
- Submit ``MOVED_TO`` events to ``handle_events``. This will recursively propagate for - Submit `MOVED_TO` events to `handle_events`. This will recursively propagate for
subdirectories, each submitting their own ``update_moved_to`` call, resetting subdirectories, each submitting their own `update_moved_to` call, resetting
its own outdated leads and changing them back, all the way down to the its own outdated leads and changing them back, all the way down to the
bottom. bottom.
In the odd case where ``MOVED_FROM`` is registered but not ``MOVED_TO``, you will In the odd case where `MOVED_FROM` is registered but not `MOVED_TO`, you will
simply remove the directory causing a ``MOVED_FROM`` event, with no recursive simply remove the directory causing a `MOVED_FROM` event, with no recursive
propagation. This should likely be changed. propagation. This should likely be changed.
''' '''
fullpath = Path(path, lead) fullpath = Path(path, lead)
@ -280,7 +260,7 @@ class PathListener(Listener[FileEvent]):
self.update_moved_from(matching_base, old_lead) self.update_moved_from(matching_base, old_lead)
# explicit queries for files & dirs faster (tested) than filtering a single query # explicit queries for files & dirs faster (tested) than filtering a single query
# using ``Path.is_dir``; handle files, then subdirectories # using `Path.is_dir`; handle files, then subdirectories
moved_from_events = [] moved_from_events = []
moved_to_events = [] moved_to_events = []
for file in util.path.iter_glob_paths('*', fullpath, no_dir=True): for file in util.path.iter_glob_paths('*', fullpath, no_dir=True):
@ -312,14 +292,14 @@ class PathListener(Listener[FileEvent]):
def handle_events(self, events): def handle_events(self, events):
''' '''
Note: Note:
If ``handle_events`` is called externally, note that this loop will block in the If `handle_events` is called externally, note that this loop will block in the
calling thread until the jobs have been submitted. It will *not* block until calling thread until the jobs have been submitted. It will _not_ block until
jobs have completed, however, as a list of futures is returned. The calling jobs have completed, however, as a list of futures is returned. The calling
Watcher instance may have already been started, in which case ``run()`` will Watcher instance may have already been started, in which case `run()` will
already be executing in a separate thread. Calling this method externally will already be executing in a separate thread. Calling this method externally will
not interfere with this loop insofar as it adds jobs to the same thread pool. not interfere with this loop insofar as it adds jobs to the same thread pool.
Because this method only submits jobs associated with the provided ``events``, Because this method only submits jobs associated with the provided `events`,
the calling thread can await the returned list of futures and be confident the calling thread can await the returned list of futures and be confident
that top-level callbacks associated with these file events have completed. Do that top-level callbacks associated with these file events have completed. Do
note that, if the Watcher has already been started, any propagating file note that, if the Watcher has already been started, any propagating file
@ -385,12 +365,12 @@ class PathListener(Listener[FileEvent]):
main process exit before final tasks can be submitted, resulting in main process exit before final tasks can be submitted, resulting in
RuntimeErrors that cannot "schedule new futures after interpreter shutdown." RuntimeErrors that cannot "schedule new futures after interpreter shutdown."
So you either need to ensure the final tasks are scheduled before calling So you either need to ensure the final tasks are scheduled before calling
``stop()`` (this means more than just a ``submit()`` call; it must have actually `stop()` (this means more than just a `submit()` call; it must have actually
propagated through to ``submit_callback`` and reached ``thread_pool.submit``) to propagated through to `submit_callback` and reached `thread_pool.submit`) to
allow them to be handled automatically prior to shutdown, or manually wait on allow them to be handled automatically prior to shutdown, or manually wait on
their futures to complete. Otherwise, thread pool shutdown will occur, and their futures to complete. Otherwise, thread pool shutdown will occur, and
they'll still be making their way out of the queue only to reach the they'll still be making their way out of the queue only to reach the
``thread_pool.submit`` after it's had its final boarding call. `thread_pool.submit` after it's had its final boarding call.
''' '''
logger.info("Stopping listener...") logger.info("Stopping listener...")

View File

@ -8,15 +8,14 @@ import inspect
import traceback import traceback
import threading import threading
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Callable
from functools import partial
from colorama import Fore, Style from colorama import Fore, Style
from collections import defaultdict from collections import defaultdict
from functools import partial, update_wrapper
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from tqdm.auto import tqdm from tqdm.auto import tqdm
from execlog.util.generic import color_text
from execlog.event import Event from execlog.event import Event
from execlog.listener import Listener from execlog.listener import Listener
@ -27,8 +26,6 @@ class Router[E: Event]:
''' '''
Route events to registered callbacks Route events to registered callbacks
.. note::
Generalized registration includes an endpoint (the origin of an event), a pattern (to Generalized registration includes an endpoint (the origin of an event), a pattern (to
filter events at the endpoint), and a callback (to be executed if pattern is matched). filter events at the endpoint), and a callback (to be executed if pattern is matched).
@ -40,36 +37,34 @@ class Router[E: Event]:
This base Router implements most of the registry and filter model. When events are This base Router implements most of the registry and filter model. When events are
submitted for propagation, they are checked for matching routes. Events specify an submitted for propagation, they are checked for matching routes. Events specify an
origin endpoint, which is used as the filter for attached routes. The event is then origin endpoint, which is used as the filter for attached routes. The event is then
subjected to the ``filter`` method, which checks if the event matches the registered subjected to the `filter` method, which checks if the event matches the registered
``pattern`` under the originated ``endpoint``. If so, the callback is scheduled for `pattern` under the originated `endpoint`. If so, the callback is scheduled for
execution, and the matching event is passed as its sole argument. execution, and the matching event is passed as its sole argument.
Subclasses are expected to implement (at least) the ``filter`` method. This function is Subclasses are expected to implement (at least) the `filter` method. This function is
responsible for wrapping up the task-specific logic needed to determine if an event, responsible for wrapping up the task-specific logic needed to determine if an event,
originating from a known endpoint, matches the callback-specific pattern. This method originating from a known endpoint, matches the callback-specific pattern. This method
needn't handle any other filter logic, like checking if the event originates from the needn't handle any other filter logic, like checking if the event originates from the
provided endpoint, as this is already handled by the outer look in ``matching_routes``. provided endpoint, as this is already handled by the outer look in `matching_routes`.
``get_listener`` is a convenience method that instantiates and populates an affiliated `get_listener` is a convenience method that instantiates and populates an affiliated
Listener over the register paths found in the Router. Listeners require a Router upon Listener over the register paths found in the Router. Listeners require a Router upon
instantiation so events can be propagated to available targets when they occur. instantiation so events can be propagated to available targets when they occur.
``get_listener()`` is the recommended way to attain a Listener. `get_listener()` is the recommended way to attain a Listener.
.. admonition:: on debouncing events
Note: on debouncing events
Previously, debouncing was handled by listeners. This logic has been generalized Previously, debouncing was handled by listeners. This logic has been generalized
and moved to this class, as it's general enough to be desired across various and moved to this class, as it's general enough to be desired across various
Listener types. We also need unique, identifying info only available with a Listener types. We also need unique, identifying info only available with a
``(endpoint, callback, pattern)`` triple in order to debounce events in accordance `(endpoint, callback, pattern)` triple in order to debounce events in accordance
with their intended target. with their intended target.
.. admonition:: tracking events and serializing callback frames Note: tracking events and serializing callback frames
Although not part of the original implementation, we now track which events have a Although not part of the original implementation, we now track which events have a
callback chain actively being executed, and prevent the same chain from being callback chain actively being executed, and prevent the same chain from being
started concurrently. If the callback chain is actively running for an event, and started concurrently. If the callback chain is actively running for an event, and
that same event is submitted before this chain finishes, the request is simply that same event is submitted before this chain finishes, the request is simply
enqueued. The ``clear_event`` method is attached as a "done callback" to each job enqueued. The `clear_event` method is attached as a "done callback" to each job
future, and will re-submit the event once the active chain finishes. future, and will re-submit the event once the active chain finishes.
While this could be interpreted as a harsh design choice, it helps prevent many While this could be interpreted as a harsh design choice, it helps prevent many
@ -86,7 +81,7 @@ class Router[E: Event]:
Parameters: Parameters:
loop: loop:
workers: number of workers to assign the thread pool when the event loop is workers: number of workers to assign the thread pool when the event loop is
started. Defaults to ``None``, which, when passed to started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work assumption given threads are more commonly leveraged for I/O work
@ -143,21 +138,21 @@ class Router[E: Event]:
(Update) The above remark about PathListener's is no longer, and likely never (Update) The above remark about PathListener's is no longer, and likely never
was. Varying flag sets under the same endpoint do in fact have a cumulative was. Varying flag sets under the same endpoint do in fact have a cumulative
effect, and we need to be able disentangle events accordingly through effect, and we need to be able disentangle events accordingly through
submitted event's ``action`` value. submitted event's `action` value.
Parameters: Parameters:
endpoint: endpoint:
callback: callable accepting an event to be executed if when a matching event callback: callable accepting an event to be executed if when a matching event
is received is received
pattern: hashable object to be used when filtering event (passed to inherited pattern: hashable object to be used when filtering event (passed to inherited
``filter(...)``) `filter(...)`)
debounce: debounce:
delay: delay:
''' '''
route_tuple = (callback, pattern, debounce, delay, listener_kwargs) route_tuple = (callback, pattern, debounce, delay, listener_kwargs)
self.routemap[endpoint].append(route_tuple) self.routemap[endpoint].append(route_tuple)
def submit(self, events: E | list[E], callbacks: list[Callable] | None = None): def submit(self, events:E | list[E], callbacks:list[Callable]|None=None):
''' '''
Handle a list of events. Each event is matched against the registered callbacks, Handle a list of events. Each event is matched against the registered callbacks,
and those callbacks are ran concurrently (be it via a thread pool or an asyncio and those callbacks are ran concurrently (be it via a thread pool or an asyncio
@ -174,19 +169,19 @@ class Router[E: Event]:
return futures return futures
def submit_event(self, event: E, callbacks: list[Callable] | None = None): def submit_event(self, event: E, callbacks:list[Callable]|None=None):
''' '''
Group up and submit all matching callbacks for ``event``. All callbacks are ran Group up and submit all matching callbacks for `event`. All callbacks are ran
concurrently in their own threads, and this method blocks until all are completed. concurrently in their own threads, and this method blocks until all are completed.
In the outer ``submit`` context, this blocking method is itself ran in its own In the outer `submit` context, this blocking method is itself ran in its own
thread, and the registered post-callbacks are attached to the completion of this thread, and the registered post-callbacks are attached to the completion of this
function, i.e., the finishing of all callbacks matching provided event. function, i.e., the finishing of all callbacks matching provided event.
Note that an event may not match any routes, in which case the method exits early. Note that an event may not match any routes, in which case the method exits early.
An empty list is returned, and this shows up as the outer future's result. In this An empty list is returned, and this shows up as the outer future's result. In this
case, the event is never considered "running," and the non-result picked up in case, the event is never considered "running," and the non-result picked up in
``clear_event`` will ensure it exits right away (not even attempting to pop the `clear_event` will ensure it exits right away (not even attempting to pop the
event from the running list, and for now not tracking it in the event log). event from the running list, and for now not tracking it in the event log).
''' '''
if callbacks is None: if callbacks is None:
@ -257,20 +252,20 @@ class Router[E: Event]:
fact already active in a frame. If this method were start filtering results while fact already active in a frame. If this method were start filtering results while
the frame is active, and the frame were to finish before all matching callbacks the frame is active, and the frame were to finish before all matching callbacks
were determined, we would be perfectly happy to return all matches, and allow the were determined, we would be perfectly happy to return all matches, and allow the
outer ``submit_event`` context to run them right away in a newly constructed frame. outer `submit_event` context to run them right away in a newly constructed frame.
The _very_ next thing that gets done is adding this event to the active event The _very_ next thing that gets done is adding this event to the active event
tracker. Otherwise, matching is performed as usual, and eligible callbacks are tracker. Otherwise, matching is performed as usual, and eligible callbacks are
simply enqueued for the next event frame, which will be checked in the "done" simply enqueued for the next event frame, which will be checked in the "done"
callback of the active frame. The logic here should mostly "seal up" any real callback of the active frame. The logic here should mostly "seal up" any real
opportunities for error, e.g., a frame ending and popping off elements from opportunities for error, e.g., a frame ending and popping off elements from
``running_events`` half-way through their inserting at the end of this method, or `running_events` half-way through their inserting at the end of this method, or
multiple threads checking for matching routes for the same event, and both coming multiple threads checking for matching routes for the same event, and both coming
away with a non-empty set of matches to run. That last example highlights away with a non-empty set of matches to run. That last example highlights
precisely how the single event-frame model works: many threads might be running precisely how the single event-frame model works: many threads might be running
this method at the same time, for the same event (which has fired rapidly), but this method at the same time, for the same event (which has fired rapidly), but
only one should be able to "secure the frame" and begin running the matching only one should be able to "secure the frame" and begin running the matching
callbacks. Making the "active frame check" both as late as possible and as close callbacks. Making the "active frame check" both as late as possible and as close
to the event blocking stage in the tracker (in ``submit_event``), we make the to the event blocking stage in the tracker (in `submit_event`), we make the
ambiguity gap as small as possible (and almost certainly smaller than any ambiguity gap as small as possible (and almost certainly smaller than any
realistic I/O-bound event duplication). realistic I/O-bound event duplication).
@ -295,15 +290,6 @@ class Router[E: Event]:
# reject event # reject event
continue continue
callback_name = str(callback)
if hasattr(callback, '__name__'):
callback_name = callback.__name__
name_text = color_text(name, Fore.BLUE)
pattern_text = color_text(pattern, Fore.BLUE)
endpoint_text = color_text(endpoint, Fore.BLUE)
callback_text = color_text(callback_name[:50], Fore.BLUE)
if self.filter(event, pattern, **listen_kwargs): if self.filter(event, pattern, **listen_kwargs):
# note that delayed callbacks are added # note that delayed callbacks are added
matches.append(self.get_delayed_callback(callback, delay, index)) matches.append(self.get_delayed_callback(callback, delay, index))
@ -311,14 +297,19 @@ class Router[E: Event]:
# set next debounce # set next debounce
self.next_allowed_time[index] = event_time + debounce self.next_allowed_time[index] = event_time + debounce
match_text = color_text('matched', Style.BRIGHT, Fore.GREEN) match_text = Style.BRIGHT + Fore.GREEN + 'matched' + Fore.RESET
callback_name = str(callback)
if hasattr(callback, '__name__'):
callback_name = callback.__name__
logger.info( logger.info(
f'Event [{name_text}] {match_text} [{pattern_text}] under [{endpoint_text}] for [{callback_text}]' f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]'
) )
else: else:
match_text = color_text('rejected', Style.BRIGHT, Fore.RED) match_text = Style.BRIGHT + Fore.RED + 'rejected' + Fore.RESET
logger.debug( logger.debug(
f'Event [{name_text}] {match_text} against [{pattern_text}] under [{endpoint_text}] for [{callback_text}]' f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]'
) )
return matches return matches
@ -348,7 +339,7 @@ class Router[E: Event]:
def wait_on_futures(self, futures): def wait_on_futures(self, futures):
''' '''
Block until all futures in ``futures`` are complete. Return collected results as a Block until all futures in `futures` are complete. Return collected results as a
list, and log warnings when a future fails. list, and log warnings when a future fails.
''' '''
future_results = [] future_results = []
@ -377,7 +368,7 @@ class Router[E: Event]:
def filter(self, event: E, pattern, **listen_kwargs) -> bool: def filter(self, event: E, pattern, **listen_kwargs) -> bool:
''' '''
Determine if a given event matches the provided pattern Determine if a given event matches the providedpattern
Parameters: Parameters:
event: event:
@ -404,7 +395,7 @@ class Router[E: Event]:
def extend_listener(self, listener): def extend_listener(self, listener):
''' '''
Extend a provided Listener object with the Router instance's ``listener_kwargs``. Extend a provided Listener object with the Router instance's `listener_kwargs`.
''' '''
for endpoint, route_tuples in self.routemap.items(): for endpoint, route_tuples in self.routemap.items():
for route_tuple in route_tuples: for route_tuple in route_tuples:
@ -421,16 +412,16 @@ class Router[E: Event]:
def clear_event(self, event: E, future): def clear_event(self, event: E, future):
''' '''
Clear an event. Pops the passed event out of ``running_events``, and the request Clear an event. Pops the passed event out of `running_events`, and the request
counter is >0, the event is re-submitted. counter is >0, the event is re-submitted.
This method is attached as a "done" callback to the main event wrapping job This method is attached as a "done" callback to the main event wrapping job
``submit_event``. The ``future`` given to this method is one to which it was `submit_event`. The `future` given to this method is one to which it was
attached as this "done" callback. This method should only be called when that attached as this "done" callback. This method should only be called when that
``future`` is finished running (or failed). If any jobs were submitted in the `future` is finished running (or failed). If any jobs were submitted in the
wrapper task, the future results here should be non-empty. We use this fact to wrapper task, the future results here should be non-empty. We use this fact to
filter out non-work threads that call this method. Because even the filter out non-work threads that call this method. Because even the
``matching_routes`` check is threaded, we can't wait to see an event has no work to `matching_routes` check is threaded, we can't wait to see an event has no work to
schedule, and thus can't prevent this method being attached as a "done" callback. schedule, and thus can't prevent this method being attached as a "done" callback.
The check for results from the passed future allows us to know when in fact a The check for results from the passed future allows us to know when in fact a
valid frame has finished, and a resubmission may be on the table. valid frame has finished, and a resubmission may be on the table.
@ -469,13 +460,11 @@ class ChainRouter[E: Event](Router[E]):
''' '''
TODO: allow positional insertion in ordered list TODO: allow positional insertion in ordered list
.. note:: Note: the `routemap` extensions here shouldn't be necessary, since 1) route maps
show up only in `matching_routes`, and 2) `matching_routes` is only invoked in
the ``routemap`` extensions here shouldn't be necessary, since 1) route maps `submit_event`, which is totally overwritten for the ChainRouter type. All events
show up only in ``matching_routes``, and 2) ``matching_routes`` is only are routed through to individual Routers, and which point their route maps are
invoked in ``submit_event``, which is totally overwritten for the ChainRouter used.
type. All events are routed through to individual Routers, and which point
their route maps are used.
''' '''
self.ordered_routers.append(router) self.ordered_routers.append(router)
for endpoint, routelist in router.routemap.items(): for endpoint, routelist in router.routemap.items():
@ -483,7 +472,7 @@ class ChainRouter[E: Event](Router[E]):
def matching_routes(self, event: E, event_time=None): def matching_routes(self, event: E, event_time=None):
''' '''
Colloquial ``callbacks`` now used as a dict of lists of callbacks, indexed by Colloquial `callbacks` now used as a dict of lists of callbacks, indexed by
router, and only having keys for routers with non-empty callback lists. router, and only having keys for routers with non-empty callback lists.
''' '''
if event_time is None: if event_time is None:
@ -499,8 +488,8 @@ class ChainRouter[E: Event](Router[E]):
def wait_on_callbacks(self, callbacks, event: E, *args, **kwargs): def wait_on_callbacks(self, callbacks, event: E, *args, **kwargs):
''' '''
Note: relies on order of callbacks dict matching that of ``ordered_routers``, which Note: relies on order of callbacks dict matching that of `ordered_routers`, which
should happen in ``matching_routes`` should happen in `matching_routes`
''' '''
results = {} results = {}
for router, callback_list in callbacks.items(): for router, callback_list in callbacks.items():
@ -515,14 +504,14 @@ class ChainRouter[E: Event](Router[E]):
def stop_event(self, event): def stop_event(self, event):
''' '''
Sub-routers do not get a "done" callback for their ``submit_event`` jobs, as they Sub-routers do not get a "done" callback for their `submit_event` jobs, as they
would if they handled their own event submissions. They will, however, set the would if they handled their own event submissions. They will, however, set the
submitted event as "running." We can't rely on sub-routers' "done" callbacks to submitted event as "running." We can't rely on sub-routers' "done" callbacks to
"unset" the running event, because the disconnect between the thread completing "unset" the running event, because the disconnect between the thread completing
and execution of that callback may take too long. and execution of that callback may take too long.
Instead, we explicitly unset the running event for each of the constituent Instead, we explicitly unset the running event for each of the constituent
sub-routers at the *same time* we handle the ChainRouter's notion of event's sub-routers at the _same time_ we handle the ChainRouter's notion of event's
ending. ending.
''' '''
event_idx = self.event_index(event) event_idx = self.event_index(event)
@ -551,141 +540,3 @@ def handle_exception(future):
except Exception as e: except Exception as e:
print(f"Exception occurred: {e}") print(f"Exception occurred: {e}")
traceback.print_exc() traceback.print_exc()
# RouterBuilder
def route(router, route_group, **route_kwargs):
def decorator(f):
f._route_data = (router, route_group, route_kwargs)
return f
return decorator
class RouteRegistryMeta(type):
'''
Metaclass handling route registry at the class level.
'''
def __new__(cls, name, bases, attrs):
route_registry = defaultdict(lambda: defaultdict(list))
def register_route(method):
nonlocal route_registry
if hasattr(method, '_route_data'):
router, route_group, route_kwargs = method._route_data
route_registry[router][route_group].append((method, route_kwargs))
# add registered superclass methods; iterate over bases (usually just one), then
# that base's chain down (reversed), then methods from each subclass
for base in bases:
for _class in reversed(base.mro()):
methods = inspect.getmembers(_class, predicate=inspect.isfunction)
for _, method in methods:
register_route(method)
# add final registered formats for the current class, overwriting any found in
# superclass chain
for attr_name, attr_value in attrs.items():
register_route(attr_value)
attrs['route_registry'] = route_registry
return super().__new__(cls, name, bases, attrs)
class RouterBuilder(metaclass=RouteRegistryMeta):
'''
Builds a (Chain)Router using attached methods and passed options.
This class can be subtyped and desired router methods attached using the provided
``route`` decorator. This facilitates two separate grouping mechanisms:
1. Group methods by frame (i.e., attach to the same router in a chain router)
2. Group by registry equivalence (i.e, within a frame, registered with the same
parameters)
These groups are indicated by the following collation syntax:
.. code-block:: python
@route('<router>/<frame>', '<route-group>', **route_kwargs)
def method(...):
...
and the following is a specific example:
.. code-block:: python
@route(router='convert', route_group='file', debounce=500)
def file_convert_1(self, event):
...
which will attach the method to the "convert" router (or "frame" in a chain router
context) using parameters (endpoint, pattern, and other keyword args) associated with
the "file" route group (as indexed by the ``register_map`` provided on instantiation)
with the ``debounce`` route keyword (which will override the same keyword values if
set in the route group). Note that the exact same ``@route`` signature can be used for
an arbitrary number of methods to be handled in parallel by the associated Router.
Note that there is one reserved route group keyword: "post," for post callbacks.
Multiple post-callbacks for a particular router can be specified with the same ID
syntax above.
.. admonition:: Map structures
The following is a more intuitive breakdown of the maps involved, provided and
computed on instantiation:
.. code-block:: python
# provided
register_map[<router-name>] -> ( Router, { <type>: ( ( endpoint, pattern ), **kwargs ) } )
# computed
routers[<router-name>][<type>] -> [... <methods> ...]
.. admonition:: TODO
Consider "flattening" the ``register_map`` to be indexed only by ``<type>``,
effectively forcing the 2nd grouping mechanism to be provided here (while the 1st
is handled by the method registration within the body of the class). This properly
separates the group mechanisms and is a bit more elegant, but reduces the
flexibility a bit (possibly in a good way, though).
'''
def __init__(
self,
register_map: dict[str, tuple[Router, dict[str, tuple[tuple[str, str], dict[str, Any]]]]],
):
self.register_map = register_map
# register
for router_name, (router, router_options) in self.register_map.items():
for route_group, method_arg_list in self.route_registry[router_name].items():
# get post-callbacks for reserved key "post"
# assumed no kwargs for passthrough
if route_group == 'post':
for method, _ in method_arg_list:
router.add_post_callback(method)
continue
group_options = router_options.get(route_group)
if group_options is None:
continue
# "group_route_kwargs" are route kwargs provided @ group level
# "method_route_kwargs" are route kwargs provided @ method level
# |-> considered more specific and will override group kwargs
(endpoint, pattern), group_route_kwargs = group_options
for method, method_route_kwargs in method_arg_list:
router.register(
endpoint,
update_wrapper(partial(method, self), method),
pattern,
**{
**group_route_kwargs,
**method_route_kwargs
}
)
def get_router(self, router_key_list: list[str]):
return ChainRouter([self.register_map[k][0] for k in router_key_list])

View File

@ -24,7 +24,7 @@ class PathRouter(Router[FileEvent]):
): ):
''' '''
Parameters: Parameters:
path: Path (directory) to watch with ``inotify`` path: Path (directory) to watch with `inotify`
func: Callback to run if FS event target matches glob func: Callback to run if FS event target matches glob
glob: Relative glob pattern to match files in provided path. The FS event's glob: Relative glob pattern to match files in provided path. The FS event's
filename must match this pattern for the callback to queued. (Default: filename must match this pattern for the callback to queued. (Default:
@ -32,7 +32,7 @@ class PathRouter(Router[FileEvent]):
debounce: debounce:
delay: delay:
listener_kwargs: Additional params for associated listener "listen" routes. listener_kwargs: Additional params for associated listener "listen" routes.
See ``PathListener.listen``. See `PathListener.listen`.
''' '''
super().register( super().register(
#endpoint=Path(path), #endpoint=Path(path),
@ -47,14 +47,14 @@ class PathRouter(Router[FileEvent]):
def filter(self, event, glob, **listen_kwargs) -> bool: def filter(self, event, glob, **listen_kwargs) -> bool:
''' '''
Note: Note:
If ``handle_events`` is called externally, note that this loop will block in the If `handle_events` is called externally, note that this loop will block in the
calling thread until the jobs have been submitted. It will _not_ block until calling thread until the jobs have been submitted. It will _not_ block until
jobs have completed, however, as a list of futures is returned. The calling jobs have completed, however, as a list of futures is returned. The calling
Watcher instance may have already been started, in which case ``run()`` will Watcher instance may have already been started, in which case `run()` will
already be executing in a separate thread. Calling this method externally will already be executing in a separate thread. Calling this method externally will
not interfere with this loop insofar as it adds jobs to the same thread pool. not interfere with this loop insofar as it adds jobs to the same thread pool.
Because this method only submits jobs associated with the provided ``events``, Because this method only submits jobs associated with the provided `events`,
the calling thread can await the returned list of futures and be confident the calling thread can await the returned list of futures and be confident
that top-level callbacks associated with these file events have completed. Do that top-level callbacks associated with these file events have completed. Do
note that, if the Watcher has already been started, any propagating file note that, if the Watcher has already been started, any propagating file

View File

@ -6,16 +6,13 @@ and job execution (routing and listening). Routers and Listeners can be started
managed independently, but a single Server instance can house, start, and shutdown managed independently, but a single Server instance can house, start, and shutdown
listeners in one place. listeners in one place.
.. admonition:: todo TODO: as it stands, the Server requires address and port details, effectively needing one
of the HTTP items (static file serving or livereloading) to be initialized appropriately.
As it stands, the Server requires address and port details, effectively needing one But there is a clear use case for just managing disparate Routers and their associated
of the HTTP items (static file serving or livereloading) to be initialized appropriately. Listeners. Should perhaps separate this "grouped listener" into another object, or just
But there is a clear use case for just managing disparate Routers and their associated make the Server definition more flexible.
Listeners. Should perhaps separate this "grouped listener" into another object, or just
make the Server definition more flexible.
''' '''
import re import re
import signal
import asyncio import asyncio
import logging import logging
import threading import threading
@ -35,7 +32,7 @@ logger = logging.getLogger(__name__)
class Server: class Server:
''' '''
Wraps up a development static file server and live reloader. Server class. Wraps up a development static file server and live reloader.
''' '''
def __init__( def __init__(
self, self,
@ -92,9 +89,9 @@ class Server:
endpoint (if livereload enabled). endpoint (if livereload enabled).
Note that, when present, the livereload endpoint is registered first, as the order Note that, when present, the livereload endpoint is registered first, as the order
in which routes are defined matters for FastAPI apps. This allows ``/livereload`` to in which routes are defined matters for FastAPI apps. This allows `/livereload` to
behave appropriately, even when remounting the root if serving static files behave appropriately, even when remounting the root if serving static files
(which, if done in the opposite order, would "eat up" the ``/livereload`` endpoint). (which, if done in the opposite order, would "eat up" the `/livereload` endpoint).
''' '''
# enable propagation and clear handlers for uvicorn internal loggers; # enable propagation and clear handlers for uvicorn internal loggers;
# allows logging messages to propagate to my root logger # allows logging messages to propagate to my root logger
@ -152,35 +149,33 @@ class Server:
''' '''
Start the server. Start the server.
.. admonition:: Design Note: Design
This method takes on some extra complexity in order to ensure the blocking This method takes on some extra complexity in order to ensure the blocking
Watcher and FastAPI's event loop play nicely together. The Watcher's ``start()`` Watcher and FastAPI's event loop play nicely together. The Watcher's `start()`
method runs a blocking call to INotify's ``read()``, which obviously cannot be method runs a blocking call to INotify's `read()`, which obviously cannot be
started directly here in the main thread. Here we have a few viable options: started directly here in the main thread. Here we have a few viable options:
1. Simply wrap the Watcher's ``start`` call in a separate thread, e.g., 1. Simply wrap the Watcher's `start` call in a separate thread, e.g.,
.. code-block:: python
```py
watcher_start = partial(self.watcher.start, loop=loop) watcher_start = partial(self.watcher.start, loop=loop)
threading.Thread(target=self.watcher.start, kwargs={'loop': loop}).start() threading.Thread(target=self.watcher.start, kwargs={'loop': loop}).start()
```
This works just fine, and the watcher's registered async callbacks can This works just fine, and the watcher's registered async callbacks can
still use the passed event loop to get messages sent back to open WebSocket still use the passed event loop to get messages sent back to open WebSocket
clients. clients.
2. Run the Watcher's `start` inside a thread managed by event loop via
`loop.run_in_executor`:
2. Run the Watcher's ``start`` inside a thread managed by event loop via ```py
``loop.run_in_executor``:
.. code-block:: python
loop.run_in_executor(None, partial(self.watcher.start, loop=loop)) loop.run_in_executor(None, partial(self.watcher.start, loop=loop))
```
Given that this just runs the target method in a separate thread, it's very Given that this just runs the target method in a separate thread, it's very
similar to option #1. It doesn't even make the outer loop context available similar to option #1. It doesn't even make the outer loop context available
to the Watcher, meaning we still have to pass this loop explicitly to the to the Watcher, meaning we still have to pass this loop explicitly to the
``start`` method. The only benefit here (I think? there may actually be no `start` method. The only benefit here (I think? there may actually be no
difference) is that it keeps things under one loop, which can be beneficial difference) is that it keeps things under one loop, which can be beneficial
for shutdown. for shutdown.
@ -191,12 +186,12 @@ class Server:
Once the watcher is started, we can kick off the FastAPI server (which may be Once the watcher is started, we can kick off the FastAPI server (which may be
serving static files, handling livereload WS connections, or both). We serving static files, handling livereload WS connections, or both). We
provide ``uvicorn`` access to the manually created ``asyncio`` loop used to the provide `uvicorn` access to the manually created `asyncio` loop used to the
run the Watcher (in a thread, that is), since that loop is made available to run the Watcher (in a thread, that is), since that loop is made available to
the ``Watcher._event_loop`` method. This ultimately allows async methods to be the `Watcher._event_loop` method. This ultimately allows async methods to be
registered as callbacks to the Watcher and be ran in a managed loop. In this registered as callbacks to the Watcher and be ran in a managed loop. In this
case, that loop is managed by FastAPI, which keeps things consistent: the case, that loop is managed by FastAPI, which keeps things consistent: the
Watcher can call ``loop.call_soon_threadsafe`` to queue up a FastAPI-based Watcher can call `loop.call_soon_threadsafe` to queue up a FastAPI-based
response _in the same FastAPI event loop_, despite the trigger for that response _in the same FastAPI event loop_, despite the trigger for that
response having originated from a separate thread (i.e., where the watcher is response having originated from a separate thread (i.e., where the watcher is
started). This works smoothly, and keeps the primary server's event loop from started). This works smoothly, and keeps the primary server's event loop from
@ -204,15 +199,14 @@ class Server:
Note that, due to the delicate Watcher behavior, we must perform a shutdown Note that, due to the delicate Watcher behavior, we must perform a shutdown
explicitly in order for things to be handled gracefully. This is done in the explicitly in order for things to be handled gracefully. This is done in the
server setup step, where we ensure FastAPI calls ``watcher.stop()`` during its server setup step, where we ensure FastAPI calls `watcher.stop()` during its
shutdown process. shutdown process.
.. admonition:: on event loop management Note: on event loop management
The uvicorn server is ran with `run_until_complete`, intended as a
The uvicorn server is ran with ``run_until_complete``, intended as a
long-running process to eventually be interrupted or manually disrupted with a long-running process to eventually be interrupted or manually disrupted with a
call to ``shutdown()``. The ``shutdown`` call attempts to gracefully shutdown the call to `shutdown()`. The `shutdown` call attempts to gracefully shutdown the
uvicorn process by setting a ``should_exit`` flag. Upon successful shutdown, the uvicorn process by setting a `should_exit` flag. Upon successful shutdown, the
server task will be considered complete, and we can then manually close the server task will be considered complete, and we can then manually close the
loop following the interruption. So a shutdown call (which is also attached as loop following the interruption. So a shutdown call (which is also attached as
a lifespan shutdown callback for the FastAPI object) will disable listeners a lifespan shutdown callback for the FastAPI object) will disable listeners
@ -240,8 +234,6 @@ class Server:
''' '''
Additional shutdown handling after the FastAPI event loop receives an interrupt. Additional shutdown handling after the FastAPI event loop receives an interrupt.
.. admonition:: Usage
This is attached as a "shutdown" callback when creating the FastAPI instance, This is attached as a "shutdown" callback when creating the FastAPI instance,
which generally appears to hear interrupts and propagate them through. which generally appears to hear interrupts and propagate them through.
@ -250,11 +242,10 @@ class Server:
approaches of the Uvicorn server do not appear to work well in this case; they approaches of the Uvicorn server do not appear to work well in this case; they
both stall the calling thread indefinitely (in the second case, when waiting on both stall the calling thread indefinitely (in the second case, when waiting on
the shutdown result), or simply don't shutdown the server (in the first). Only the shutdown result), or simply don't shutdown the server (in the first). Only
setting ``should_exit`` and allowing for a graceful internal shutdown appears to setting `should_exit` and allowing for a graceful internal shutdown appears to
both 1) handle this gracefully, and 2) shut down the server at all. both 1) handle this gracefully, and 2) shut down the server at all.
.. code-block:: python ```
self.loop.call_soon_threadsafe(self.userver.shutdown) self.loop.call_soon_threadsafe(self.userver.shutdown)
# OR # # OR #
@ -263,14 +254,15 @@ class Server:
# and wait for shutdown # and wait for shutdown
future.result() future.result()
```
The shutdown process goes as follows: The shutdown process goes as follows:
1. Stop any managed listeners: close out listener loops and/or thread pools by 1. Stop any managed listeners: close out listener loops and/or thread pools by
calling ``stop()`` on each of the managed listeners. This prioritizes their calling `stop()` on each of the managed listeners. This prioritizes their
closure so that no events can make their way into the queue. closure so that no events can make their way into the queue.
2. Gracefully shut down the wrapper Uvicorn server. This is the process that 2. Gracefully shut down the wrapper Uvicorn server. This is the process that
starts the FastAPI server instance; set the ``should_exit`` flag. starts the FastAPI server instance; set the `should_exit` flag.
If this completes successfully, in the thread where Uvicorn was started the server If this completes successfully, in the thread where Uvicorn was started the server
task should be considered "completed," at which point the event loop can be closed task should be considered "completed," at which point the event loop can be closed
@ -292,36 +284,3 @@ class Server:
self.loop.call_soon_threadsafe(set_should_exit) self.loop.call_soon_threadsafe(set_should_exit)
class ListenerServer:
'''
Server abstraction to handle disparate listeners.
'''
def __init__(
self,
managed_listeners : list | None = None,
):
if managed_listeners is None:
managed_listeners = []
self.managed_listeners = managed_listeners
def start(self):
signal.signal(signal.SIGINT, lambda s,f: self.shutdown())
signal.signal(signal.SIGTERM, lambda s,f: self.shutdown())
for listener in self.managed_listeners:
#loop.run_in_executor(None, partial(self.listener.start, loop=loop))
if not listener.started:
listener.start()
for listener in self.managed_listeners:
listener.join()
def shutdown(self):
# stop attached auxiliary listeners, both internal & external
if self.managed_listeners:
logger.info(f"Stopping {len(self.managed_listeners)} listeners...")
for listener in self.managed_listeners:
listener.stop()

View File

@ -5,10 +5,6 @@ import colorama
from colorama import Fore, Back, Style from colorama import Fore, Back, Style
def color_text(text, *colorama_args):
return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}"
class ColorFormatter(logging.Formatter): class ColorFormatter(logging.Formatter):
_format = '%(levelname)-8s :: %(name)s %(message)s' _format = '%(levelname)-8s :: %(name)s %(message)s'
colorama.init(autoreset=True) colorama.init(autoreset=True)

View File

@ -15,7 +15,6 @@ sphinx
sphinx-togglebutton sphinx-togglebutton
furo furo
myst-parser myst-parser
sphinx-autodoc-typehints
# -- testing --- # -- testing ---
pytest pytest