commit 109fbefd419f6ea6ccb6a82e9a3761905d3ed54a Author: Sam G. Date: Fri Apr 19 18:43:52 2024 -0700 initial commit (public reset) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d46023a --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +# generic py +__pycache__/ +.pytest_cache/ +localsys.egg-info/ +.ipynb_checkpoints/ +.python-version + +# vendor and build files +dist/ +build/ +docs/_autoref/ +docs/_autosummary/ +docs/_build/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..e69de29 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b63f792 --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +PYTHON=/home/smgr/.pyenv/versions/execlog/bin/python +BASH=/usr/bin/bash + + +## ------------------ docs ------------------ ## +docs-build: + sphinx-apidoc --module-first --separate -o docs/_autoref/ execlog + make -C docs/ html + +docs-serve: + cd docs/_build/html && python -m http.server 9090 + +docs-clean: + make -C docs/ clean + rm -rf docs/_autoref +## ------------------------------------------ ## + +## ----------------- tests ------------------ ## +test: + pytest --pyargs tests -v diff --git a/README.md b/README.md new file mode 100644 index 0000000..73174c3 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# Overview +`execlog` is a package diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..fb19593 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,38 @@ +# Configuration file for the Sphinx documentation builder. +# +# For the full list of built-in configuration values, see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Project information ----------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information + +project = 'execlog' +copyright = '2024, Sam Griesemer' +author = 'Sam Griesemer' + +# -- General configuration --------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration + +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.autosummary", + "sphinx.ext.viewcode", + "myst_parser", +] +autosummary_generate = True +autosummary_imported_members = True + +templates_path = ['_templates'] +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] + + + +# -- Options for HTML output ------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output + +html_theme = 'furo' +html_static_path = ['_static'] +#html_sidebars = { +# '**': ['/modules.html'], +#} + diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..b654582 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,32 @@ +# `execlog` package docs +{ref}`genindex` +{ref}`modindex` +{ref}`search` + +```{eval-rst} +.. autosummary:: + :nosignatures: + + execlog.Handler + execlog.Listener + execlog.Router + execlog.Server +``` + +```{toctree} +:maxdepth: 3 +:caption: Autoref + +_autoref/execlog.rst +``` + +```{toctree} +:maxdepth: 3 +:caption: Contents + +reference/documentation/index +``` + +```{include} ../README.md +``` + diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..32bb245 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/reference/documentation/index.md b/docs/reference/documentation/index.md new file mode 100644 index 0000000..a14cdde --- /dev/null +++ b/docs/reference/documentation/index.md @@ -0,0 +1,8 @@ +# Documentation + +```{toctree} +:hidden: + +sphinx +``` + diff --git a/docs/reference/documentation/sphinx.md b/docs/reference/documentation/sphinx.md new file mode 100644 index 0000000..33d6f27 --- /dev/null +++ b/docs/reference/documentation/sphinx.md @@ -0,0 +1,111 @@ +# Sphinx +The primary driver of this package's documentation is Sphinx's `autodoc` extension, +using the [Furo theme][1]. + +**High-level details**: + +- `sphinx-apidoc` generates package-based documentation to the `_autoref/` directory, + with navigation available under "Autoref" in the sidebar. +- Markdown-based documentation files are manually written under the `reference/` + directory, showing up under "Contents" in the sidebar. + +## Detailed directory structure +All files are placed under `docs/sphinx`: + +- `_`-prefixed are Sphinx-managed directories + * `_build/html/` houses output HTML files + * `_autoref/` is the target for module-based RST files written by `autodoc` +- `reference/`: houses all manually written documentation (totally separate from + auto-generated package docs) +- `conf.py`: single Sphinx configuration file +- `index.md`: documentation index, setups up a persistent sidebar across all other pages + +For manually written documentation under `reference/`, topics are nested as needed. Within +a nested directory `reference/`, an `index.md` should created with content like: + +``` +# + +\`\`\`{toctree} +:hidden: + +sub-topic-1.rst +sub-topic-2.rst +... +\`\`\` +``` + +This will add the nested directory to the sidebar navigation, using the name set under the +top-level header. See [Markdown syntax][#markdown-syntax] for more details on the syntax. + +## Sphinx autodoc +Sphinx's `autodoc` extension allows automatic generation of documents according to +(Python) subpackage structure and available docstrings. A few notes here: + +- In the `conf.py` file, autodoc is enabled by adding `"sphinx.ext.autodoc"` to + the extensions list. `"sphinx.ext.viewcode"` can also be added to provide + links to source code. +- Documents are actually generated by calling the `sphinx-apidoc` CLI command. The + current Makefile uses the following call: + + ```sh + sphinx-apidoc --module-first -o docs/sphinx/_autoref/ localsys + ``` + + This writes the automatically generated docs for modules in the package at the + local directory `localsys/` to the `docs/sphinx/_autoref` directory. These are + reStructuredText files by default. + * `--module-first` places the module-level descriptions at the top of the module page. + By default, this is placed at the bottom (oddly), and can be obscured by large lists + of subpackages if this flag isn't provided. + * See available `sphinx-apidoc` options [here][2], as well as more advanced config + [here][3]. + + +## Markdown syntax +The `myst_parser` extension enables Markdown (or something close to it) to be used when +writing documentation files. The Sphinx directives can be difficult to track, and +they change slightly under the MyST Markdown syntax. The following are a few common +blocks: + +**Page hierarchies**: the following will generate link hierarchy according to the provided +pages: + +``` +\`\`\`{toctree} +:maxdepth: +:caption: +:hidden: + +example-file-1 +example-file-2 +example-dir/index +... +\`\`\` +``` + +- `:maxdepth:` limits the depth of nesting +- `:caption:` title for the group of pages +- `:hidden:` if provided, links will only show in the sidebar (hidden on the page) +- Constituent files: listed files will be rendered as a link directly. If a listed file + has a `{toctree}` directive, this tree will be rendered in place of the page's link as a + dropdown. The dropdown will be named according to the file's top-level heading, and + clicking directly on the dropdown header will show that page's content. Files found in + the tree will be placed as links under the dropdown, recursively subject to same rules + described here. + +**Include files**: the following will include file content +pages: + +``` +\`\`\`{include} README.md +\`\`\` +``` + +**Reference directives** + + +[1]: https://pradyunsg.me/furo/ +[2]: https://www.sphinx-doc.org/en/master/man/sphinx-apidoc.html +[3]: https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html# + diff --git a/execlog.egg-info/PKG-INFO b/execlog.egg-info/PKG-INFO new file mode 100644 index 0000000..fba8d5f --- /dev/null +++ b/execlog.egg-info/PKG-INFO @@ -0,0 +1,14 @@ +Metadata-Version: 2.1 +Name: execlog +Version: 0.1.1 +Summary: Lightweight multi-threaded job framework +Author-email: Sam Griesemer +Classifier: Programming Language :: Python :: 3 +Classifier: License :: OSI Approved :: MIT License +Classifier: Operating System :: OS Independent +Requires-Python: >=3.12 +Description-Content-Type: text/markdown +Requires-Dist: tqdm + +# Overview +`execlog` is a package diff --git a/execlog.egg-info/SOURCES.txt b/execlog.egg-info/SOURCES.txt new file mode 100644 index 0000000..82933f5 --- /dev/null +++ b/execlog.egg-info/SOURCES.txt @@ -0,0 +1,19 @@ +MANIFEST.in +README.md +pyproject.toml +execlog/__init__.py +execlog/handler.py +execlog/listener.py +execlog/router.py +execlog/server.py +execlog.egg-info/PKG-INFO +execlog.egg-info/SOURCES.txt +execlog.egg-info/dependency_links.txt +execlog.egg-info/requires.txt +execlog.egg-info/top_level.txt +execlog/listeners/__init__.py +execlog/listeners/path.py +execlog/routers/__init__.py +execlog/routers/path.py +execlog/util/__init__.py +execlog/util/path.py \ No newline at end of file diff --git a/execlog.egg-info/dependency_links.txt b/execlog.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/execlog.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/execlog.egg-info/requires.txt b/execlog.egg-info/requires.txt new file mode 100644 index 0000000..78620c4 --- /dev/null +++ b/execlog.egg-info/requires.txt @@ -0,0 +1 @@ +tqdm diff --git a/execlog.egg-info/top_level.txt b/execlog.egg-info/top_level.txt new file mode 100644 index 0000000..9968ff8 --- /dev/null +++ b/execlog.egg-info/top_level.txt @@ -0,0 +1 @@ +execlog diff --git a/execlog/__init__.py b/execlog/__init__.py new file mode 100644 index 0000000..442d884 --- /dev/null +++ b/execlog/__init__.py @@ -0,0 +1,8 @@ +from execlog.handler import Handler +from execlog.listener import Listener +from execlog.router import Router, ChainRouter, Event +from execlog.server import Server + +from execlog import listeners +from execlog import routers +from execlog import util diff --git a/execlog/handler.py b/execlog/handler.py new file mode 100644 index 0000000..f958c76 --- /dev/null +++ b/execlog/handler.py @@ -0,0 +1,150 @@ +import re +import logging +from pathlib import Path + +from inotify_simple import flags +from starlette.endpoints import WebSocketEndpoint + + +logger = logging.getLogger(__name__) + +#page_re = re.compile(r'https?:\/\/.*?\/(.*?)(?:\?|\.html|$)') +page_re = re.compile(r'https?:\/\/.*?\/(.*?)$') + +def client_reload_wrap(reloaded_file): + rpath = Path(reloaded_file) + static_extensions = ['.js', '.css'] + + if rpath.suffix in static_extensions: + return lambda _: True + else: + return lambda c: Path(c).with_suffix('.html') == rpath + + +class Handler(WebSocketEndpoint): + ''' + Subclasses WebSocketEndpoint to be attached to live reload endpoints. + + Note: Reload model + - Served HTML files are generated from templates that include livereload JS and the + target livereload server (port manually set prior to site build). + - When pages are visited (be they served from NGINX or via the development + server), the livereload.js attempts to connect to the known livereload WS + endpoint. + - FastAPI routes the request to _this_ endpoint, and `on_connect` is called. + - Upon successful connection, the livereload JS client sends a "hello" message. + This is picked up as the first post-acceptance message, and captured by the + `on_receive` method. + - `on_receive` subsequently initiates a formal handshake, sending back a "hello" + command and waiting the "info" command from the client. + - If the "info" command is received successfully and contains the requesting + page's URL, the handshake completes and the websocket is added to the class' + `live_clients` tracker. + - Later, when a file in a watch path of the server's watcher is _modified_, + `reload_clients` will be called from within the originating server's event loop, + and pass in the FS event associated with the file change. `client_reload_wrap` + is used to wrap a boolean checker method for whether or not to reload clients + given the FS event. + + TODO: flesh out the reload wrapper to incorporate more complex filters and/or + transformations when determining when to reload certain clients. + ''' + encoding = 'json' + live_clients = {} + + async def on_connect(self, websocket): + await websocket.accept() + + async def on_receive(self, websocket, data): + ''' + Note: On page names + When websockets connect, they simply communicate the exact URL from the origin + page. The websocket is then indexed to possibly variable page names (often + without an `.html` suffix, but occasionally with). The `client_reload_wrap` is + then responsible for taking this client page name and normalizing it to be + matched with full file names (i.e., suffix always included). + ''' + url = await self._lr_handshake(websocket, data) + + if url is None: + logger.warning('Client handshake failed, ignoring') + return + + origin_m = page_re.search(url) + if origin_m is not None: + origin_page = origin_m.group(1) + + # assume index.html if connected to empty name + if origin_page == '': + origin_page = 'index.html' + else: + origin_page = '.null' + + self.live_clients[origin_page] = websocket + logger.info(f'Reloader connected to [{origin_page}] ({len(self.live_clients)} live clients)') + + async def on_disconnect(self, websocket, close_code): + remove_page = None + for page, ws in self.live_clients.items(): + if ws == websocket: + remove_page = page + + if remove_page is not None: + logger.info(f'Client for [{remove_page}] disconnected, removing') + self.live_clients.pop(remove_page) + + @classmethod + async def reload_clients(cls, event): + ''' + Method targeted as a watcher callback. This async method is scheduled in a + thread-safe manner by the watcher to be ran in the FastAPI event loop. + ''' + logger.info(f'> [{event.name}] changed on disk') + should_reload = client_reload_wrap(event.name) + + for page, ws in cls.live_clients.items(): + if should_reload(page): + logger.info(f'>> Reloading client for [{page}]') + await ws.send_json({ + 'command' : 'reload', + 'path' : page, + 'liveCSS' : True, + 'liveImg' : True, + }) + + @staticmethod + async def _lr_handshake(websocket, hello): + ''' + Handshake with livereload.js + + 1. client send 'hello' + 2. server reply 'hello' + 3. client send 'info' + ''' + # 1. await client hello after accept + #hello = await websocket.receive_json() + + if hello.get('command') != 'hello': + logger.warning('Client handshake failed at "hello" stage') + return + + # 2. send hello to client + await websocket.send_json({ + 'command': 'hello', + 'protocols': [ + 'http://livereload.com/protocols/official-7', + ], + 'serverName': 'livereload-tornado', + }) + + # 3. await info response + info = await websocket.receive_json() + + if info.get('command') != 'info': + logger.warning('Client handshake failed at "info" stage') + return None + elif 'url' not in info: + logger.warning('Info received from client, but no URL provided') + return None + + return info['url'] diff --git a/execlog/listener.py b/execlog/listener.py new file mode 100644 index 0000000..53ca1f3 --- /dev/null +++ b/execlog/listener.py @@ -0,0 +1,30 @@ +''' +Implements a file system watcher. + +See also: + - https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read +''' +import threading + + +class Listener(threading.Thread): + def __init__(self, router): + ''' + Parameters: + workers: number of workers to assign the thread pool when the event loop is + started. Defaults to `None`, which, when passed to + ThreadPoolExecutor, will by default use 5x the number of available + processors on the machine (which the docs claim is a reasonable + assumption given threads are more commonly leveraged for I/O work + rather than intense CPU operations). Given the intended context for + this class, this assumption aligns appropriately. + ''' + super().__init__() + + self.router = router + + def listen(self): + raise NotImplementedError + + def run(self): + raise NotImplementedError diff --git a/execlog/listeners/__init__.py b/execlog/listeners/__init__.py new file mode 100644 index 0000000..0d01181 --- /dev/null +++ b/execlog/listeners/__init__.py @@ -0,0 +1 @@ +from execlog.listeners.path import PathListener diff --git a/execlog/listeners/path.py b/execlog/listeners/path.py new file mode 100644 index 0000000..19a9a52 --- /dev/null +++ b/execlog/listeners/path.py @@ -0,0 +1,388 @@ +''' +Implements a file system watcher. + +See also: + +- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read +''' +#import fnmatch +import os +import time +import select +import logging +from pathlib import Path +from collections import defaultdict + +from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks + +from execlog import util +from execlog.listener import Listener + + +logger = logging.getLogger(__name__) + +# hardcoded file names to ignore +# - "4913" is a temp file created by Vim before editing +IGNORE_PATTERNS = ['4913', '.sync*.db*'] + +class PathListener(Listener): + def __init__(self, router): + ''' + Parameters: + workers: number of workers to assign the thread pool when the event loop is + started. Defaults to `None`, which, when passed to + ThreadPoolExecutor, will by default use 5x the number of available + processors on the machine (which the docs claim is a reasonable + assumption given threads are more commonly leveraged for I/O work + rather than intense CPU operations). Given the intended context for + this class, this assumption aligns appropriately. + + Note: + Due to the nature of INotify, you cannot watch the same path with two + separate flag settings (without creating a new INotify instance). Under the + same instance, calling `add_watch` for an already watched path location will + simply return the watch descriptor already associated with that location (and + may update the flags to whatever is passed). However, this location will only + ever be "watched once" by a given INotify instance, so keep this in mind if + multiple downstream callbacks (packaged up by the Router) want to react to the + same path but with different flags (i.e., it won't work as expected). + ''' + super().__init__(router) + + self.started = False + + self.pathmap : dict[str, int] = {} + self.canonmap : dict[int, tuple] = {} + self.watchmap : dict[int, dict[tuple, int]] = defaultdict(lambda: defaultdict(int)) + + self.unmatched_move_froms : dict[int, dict[str, str]] = defaultdict(dict) + + self.inotify = INotify() + + self.read_fd, write_fd = os.pipe() + self.write = os.fdopen(write_fd, "wb") + + def _add_watch( + self, + path, + flags, + lead=None, + remove=False, + ): + if lead is None: lead = '' + path = Path(path) + lead = Path(lead) + + wd = None + fullpath = Path(path, lead) + try: + wd = self.inotify.add_watch(str(fullpath), flags) + self.watchmap[wd][(path, lead)] |= flags + self.pathmap[fullpath] = wd + except FileNotFoundError: + logger.error(f'Directory not found for [{fullpath}] when attempting to watch') + + return wd + + def _add_watch_recursive( + self, + path, + flags, + lead=None, + remove=False, + ): + ''' + Recursively watch directories under path/lead, using `path` as the registered + base. Specifying `lead` gives one control over the subdirectory on which to + recurse; the "origin" the recursion base point. + + Note: on renamed/moved directories + This method is used to reset the mapped path/lead tuples pointed to by certain + watch descriptors when a directory is renamed. iNotify will fire a MOVED event + for the directory (MOVED_FROM/MOVED_TO), but will use the same watch + descriptor for the new directory as it did the old. This will leave all watch + dynamics intact, but new file events will fire and send the old base path to + the router. Explicitly re-watching the renamed directory (and any + subdirectories) will also return that existing watch descriptor. Thus, this + method can just be directly called for directory moves/renames, and WDs in the + `watchmap` will just be used as expected. (One should also call the method + using the old lead and set `remove=True` to remove old tuples out of the + `watchmap`. Note that this doesn't remove the existing watches from iNotify, + just their tracked tuples.) + ''' + if lead is None: + lead = '' + + wds = [] + origin = Path(path, lead) + for subdir in [origin, *util.path.iter_glob_paths('**/', origin)]: + lead = subdir.relative_to(Path(path)) + wd = self._add_watch(path, flags, lead=lead, remove=remove) + if wd is not None: + wds.append(wd) + return wds + + def listen( + self, + path, + flags=None, + ): + ''' + Parameters: + path: Path (directory) to watch with `inotify` + flags: inotify_simple flags matching FS event types allowed to trigger the + callback + debounce: time in milliseconds to debounce file-based events at this path. + Applies to _file targets_; the same filename will have events + "debounced" at this interval (time delta calculated from last + un-rejected match). + ''' + path = Path(path) + + if flags is None: + flags = iflags.CREATE | iflags.DELETE | iflags.MODIFY | iflags.DELETE_SELF | iflags.MOVED_TO + + # ensure flags can detect directory events + flags |= iflags.CREATE | iflags.MOVED_FROM | iflags.MOVED_TO + + wds = self._add_watch_recursive(path, flags) + + try: + self.canonmap[wds[0]] = (path, flags) + except IndexError: + logger.error(f'Path {path} returned no INotify watch descriptors') + raise + + def run(self): + ''' + Note: On usage + `start()` is a blocking call. This will hog your main thread if not properly + threaded. If handling this manually in your outer context, you will also need + to make sure to call `.stop()` + + Parameters: + loop: asyncio loop to pass to `_event_loop`; used to schedule async callbacks + when present + ''' + self.started = True + logger.info(f'Starting listener for {len(self.watchmap)} paths') + + for path, flags in self.canonmap.values(): + logger.info(f'> Listening on path {path} for flags {iflags.from_mask(flags)}') + + for (callback, pattern, debounce, delay, *_) in self.router.routemap[path]: + logger.info(f'| > {pattern} -> {callback.__name__} (debounce {debounce}ms, delay {delay}ms)') + + while True: + rlist, _, _ = select.select( + [self.inotify.fileno(), self.read_fd], [], [] + ) + + if self.inotify.fileno() in rlist: + events = self.inotify.read(timeout=0) + self.handle_events(events) + + # check for written stop byte + if self.read_fd in rlist: + os.close(self.read_fd) + self.inotify.close() + return + + def update_moved_from(self, path, lead): + ''' + Update directories on `MOVED_FROM` events. This method gets the existing WD, + removes the old path associated with that WD from the `watchmap` (preventing + events originating from this old path when the new path, which has the _same WD_, + receives an inotify event), and queues the (WD, base-path) tuple to be matched + later in a `MOVED_TO` handler. + + This method isn't a part of a `MOVED_TO` handler because it may be called without + ever having a `MOVED_TO` that follows up. We respond right away in `handle_events` + to `MOVED_FROM` events, keeping the `watchmap` in sync, regardless of whether we + can expect a `MOVED_TO` to sweep through after the fact. + + Note that the `lead` is unique for a given WD and base path. WDs are unique for + filepaths, but inotify uses the same WD for new directories when they experience a + rename (it's the same inode). However, during such a transition, the `watchmap` + can see two different entries for the same WD and basepath: the old tracked path, + and the newly named one (again, still the same inode). So: this method can be + called 1) directly from `MOVED_FROM` events, preemptively wiping the old path from + the tracked dicts, or 2) during handling of a `MOVED_TO` event (in case we don't + allow `MOVED_FROM` events, for instance), given both the new and old paths can be + seen in the `watchmap`. + ''' + wd = self.pathmap.get(Path(path, lead)) + logger.debug(f'> MOVED_FROM update, [{Path(path, lead)}] in pathmap as [{wd}]') + if wd is None: return + + if self.watchmap[wd].pop((path, lead), None): + logger.debug(f'> MOVED_FROM update, popped from watchmap') + self.unmatched_move_froms[wd][path] = lead + + def update_moved_to(self, path, lead): + ''' + Construct synthetic MOVED events. Events are constructed from the path's WD. If + the provided path is not watched, an empty list of events is returned. + + Note: Design details + This method is nuanced. It can only be called once a `MOVED_TO` occurs, since + we can't act on a `MOVED_FROM` (we don't know the new target location to look + so we can send file events). When called, we first look for the path's WD in + the `pathmap`. We then check if this WD points to more than one entry with the + same base path (WDs are unique to the path; under the same WD, the same base + path implies the same lead). If so, we know one is the outdated path, and we + push the outdated lead to `update_moved_from`. This would be evidence that the + `MOVED_FROM` event for the move operation wasn't handled in the main event + handling loop. We then check for unmatched move-froms, which should provide + any renamed directories, regardless of whether `MOVED_FROM`s were allowed, to + be detected. Finally, the appropriate `MOVED_FROM`s and `MOVED_TO`s are + handled. To ensure only the correct events match upon handling, we do the + following: + + - First, if a `MOVED_FROM` path is not available, we assume it wasn't queued + by the event and not a watched flag. Given we by default ensure MOVED events + are tracked, regardless of listened paths, this shouldn't be possible, but + if this standard were to change, we won't recursively respond to + `MOVED_FROM`s. This will mean that we can't prevent events from being + matched to old directory names (we've rooted out the ability to tell when + they've changed), and thus can't remove them from the `watchpath` + accordingly. (One functional caveat here: this MOVED_TO handling method + explicitly calls `updated_moved_from`, which should clean up lingering + renamed path targets. This happens recursively if we're watching MOVED_TOs, + so even if standards do change and you don't watch `MOVED_FROM`s, you'll + still get clean up for free due to the robustness of this method. + - If a `MOVED_FROM` lead is found, either due to an inferred matching base + lingering in the `watchmap` or through previously handled `MOVED_FROM` + response, add this path/lead back to the `watchmap`, remove the new + path/lead, and call `handle_events` for the synthetic `MOVED_FROM` events + across files and directories. Once finished, again remove the old path/lead + and add back the new one. + - Submit `MOVED_TO` events to `handle_events`. This will recursively propagate for + subdirectories, each submitting their own `update_moved_to` call, resetting + its own outdated leads and changing them back, all the way down to the + bottom. + + In the odd case where `MOVED_FROM` is registered but not `MOVED_TO`, you will + simply remove the directory causing a `MOVED_FROM` event, with no recursive + propagation. This should likely be changed. + ''' + fullpath = Path(path, lead) + + wd = self.pathmap.get(fullpath) + if wd is None: + logger.debug(f'Directory [{fullpath}] moved, but is not watched, ignoring') + return [] + + # inspect registered paths with same WD -- looking for same base path but diff lead + # will be empty if explicitly handled by a MOVED_FROM -- else inferred from watchmap + matching_bases = [pl for pl in self.watchmap[wd] if pl[0] == path and pl[1] != lead] + + # there should be at most one of these, but handle iteratively + for matching_base, old_lead in matching_bases: + self.update_moved_from(matching_base, old_lead) + + # explicit queries for files & dirs faster (tested) than filtering a single query + # using `Path.is_dir`; handle files, then subdirectories + moved_from_events = [] + moved_to_events = [] + for file in util.path.iter_glob_paths('*', fullpath, no_dir=True): + moved_from_events.append(iEvent(wd=wd, mask=iflags.MOVED_FROM, cookie=0, name=file.name)) + moved_to_events.append(iEvent(wd=wd, mask=iflags.MOVED_TO, cookie=0, name=file.name)) + + for subdir in util.path.iter_glob_paths('*/', fullpath): + moved_from_mask = iflags.MOVED_FROM | iflags.ISDIR + moved_from_events.append(iEvent(wd=wd, mask=moved_from_mask, cookie=0, name=subdir.name)) + + moved_to_mask = iflags.MOVED_TO | iflags.ISDIR + moved_to_events.append(iEvent(wd=wd, mask=moved_to_mask, cookie=0, name=subdir.name)) + + # check for unmatched moved froms -- should be enqueued in event loop or just above + moved_from_lead = self.unmatched_move_froms.get(wd, {}).pop(path, None) + if moved_from_lead is None: + logger.debug(f'Couldn\'t find MOVED_FROM origin, just yielding MOVED_TO events') + else: + # temporarily remove new path, add old path to allow MOVED_FROMs to seep through + flags = self.watchmap[wd].pop((path, lead)) # remove new + self.watchmap[wd][(path, moved_from_lead)] = flags # add old + self.handle_events(moved_from_events) + + self.watchmap[wd].pop((path, moved_from_lead)) # remove old + self.watchmap[wd][(path, lead)] = flags # add back new + + self.handle_events(moved_to_events) + + def handle_events(self, events): + ''' + Note: + If `handle_events` is called externally, note that this loop will block in the + calling thread until the jobs have been submitted. It will _not_ block until + jobs have completed, however, as a list of futures is returned. The calling + Watcher instance may have already been started, in which case `run()` will + already be executing in a separate thread. Calling this method externally will + not interfere with this loop insofar as it adds jobs to the same thread pool. + + Because this method only submits jobs associated with the provided `events`, + the calling thread can await the returned list of futures and be confident + that top-level callbacks associated with these file events have completed. Do + note that, if the Watcher has already been started, any propagating file + events will be picked up and possibly processed simultaneously (although their + associated callbacks will have nothing to do with the returned list of futures). + ''' + from execlog.router import Event + + for event in events: + # hard coded ignores + if util.path.glob_match(event.name, IGNORE_PATTERNS): continue + + mask_flags = iflags.from_mask(event.mask) + + if event.wd not in self.watchmap: + raise ValueError(f'Watcher fired for untracked descriptor origin: {event}') + + moved_froms = [] + moved_tos = [] + for (path, lead), flags in self.watchmap[event.wd].items(): + relpath = Path(lead, event.name) + abspath = Path(path, relpath) + + # add new directories + if iflags.ISDIR in mask_flags: + if iflags.CREATE in mask_flags: + logger.debug(f'New directory detected [{relpath}]') + self._add_watch_recursive(path, flags, lead=relpath) + + if iflags.MOVED_FROM in mask_flags: + moved_froms.append((path, relpath)) + + if iflags.MOVED_TO in mask_flags: + moved_tos.append((path, relpath)) + + continue + + logger.debug(f'Watcher fired for [{relpath}]: {mask_flags}') + + route_event = Event(endpoint=str(path), name=str(relpath), action=mask_flags) + self.router.submit(route_event) + + # handle renamed directories; old dir was being watched if these flags + # match. The same WD is used by iNotify for the new dir, so + # recursively update explicitly stored paths. + for path, lead in moved_froms: + logger.debug(f'Directory moved, removing old [{lead}]') + self.update_moved_from(path, lead) + + for path, lead in moved_tos: + logger.debug(f'Directory moved, adding new [{lead}]') + self._add_watch(path, flags, lead=lead) + self.update_moved_to(path, lead) + + def stop(self): + logger.info("Stopping listener...") + + if self.router.thread_pool is not None: + self.router.thread_pool.shutdown() + + # request INotify stop by writing in the pipe, checked in watch loop + if not self.write.closed: + self.write.write(b"\x00") + self.write.close() diff --git a/execlog/router.py b/execlog/router.py new file mode 100644 index 0000000..068346d --- /dev/null +++ b/execlog/router.py @@ -0,0 +1,525 @@ +import time +import asyncio +import logging +import inspect +import traceback +import threading +from pathlib import Path +from typing import Callable +from functools import partial +from colorama import Fore, Style +from collections import namedtuple, defaultdict +from concurrent.futures import ThreadPoolExecutor, wait, as_completed + +from tqdm.auto import tqdm + +logger = logging.getLogger(__name__) + + +Event = namedtuple( + 'Event', + ['endpoint', 'name', 'action'], + defaults=[None, None, None], +) + +class Router: + ''' + Route events to registered callbacks + + Generalized registration includes an endpoint (the origin of an event), a pattern (to + filter events at the endpoint), and a callback (to be executed if pattern is matched). + + The Router _routes_ events to affiliated callbacks in a multi-threaded fashion. A + thread pool handles these jobs as events are submitted, typically by a composing + Listener. The Listener "hears" an event, and passes it on through to a Router to + further filter and delegate any matching follow-up jobs. + + This base Router implements most of the registry and filter model. When events are + submitted for propagation, they are checked for matching routes. Events specify an + origin endpoint, which is used as the filter for attached routes. The event is then + subjected to the `filter` method, which checks if the event matches the registered + `pattern` under the originated `endpoint`. If so, the callback is scheduled for + execution, and the matching event is passed as its sole argument. + + Subclasses are expected to implement (at least) the `filter` method. This function is + responsible for wrapping up the task-specific logic needed to determine if an event, + originating from a known endpoint, matches the callback-specific pattern. This method + needn't handle any other filter logic, like checking if the event originates from the + provided endpoint, as this is already handled by the outer look in `matching_routes`. + + `get_listener` is a convenience method that instantiates and populates an affiliated + Listener over the register paths found in the Router. Listeners require a Router upon + instantiation so events can be propagated to available targets when they occur. + `get_listener()` is the recommended way to attain a Listener. + + Note: on debouncing events + Previously, debouncing was handled by listeners. This logic has been generalized + and moved to this class, as it's general enough to be desired across various + Listener types. We also need unique, identifying info only available with a + `(endpoint, callback, pattern)` triple in order to debounce events in accordance + with their intended target. + + Note: tracking events and serializing callback frames + Although not part of the original implementation, we now track which events have a + callback chain actively being executed, and prevent the same chain from being + started concurrently. If the callback chain is actively running for an event, and + that same event is submitted before this chain finishes, the request is simply + enqueued. The `clear_event` method is attached as a "done callback" to each job + future, and will re-submit the event once the active chain finishes. + + While this could be interpreted as a harsh design choice, it helps prevent many + many thread conflicts (race conditions, writing to the same resources, etc) when + the same function is executed concurrently, many times over. Without waiting + completely for an event to be fully handled, later jobs may complete before + earlier ones, or interact with intermediate disk states (raw file writes, DB + inserts, etc), before the earliest call has had a chance to clean up. + ''' + def __init__(self, loop=None, workers=None, listener_cls=None): + ''' + Parameters: + loop: + workers: + listener_cls: + ''' + self.loop = loop + self.workers = workers + self.listener_cls = listener_cls + + self.routemap : dict[str, list[tuple]] = defaultdict(list) + self.post_callbacks = [] + + # track running jobs by event + self.running_events = defaultdict(set) + + # debounce tracker + self.next_allowed_time = defaultdict(int) + + # store prepped (e.g., delayed) callbacks + self.callback_registry = {} + + self._thread_pool = None + self._route_lock = threading.Lock() + + @property + def thread_pool(self): + if self._thread_pool is None: + self._thread_pool = ThreadPoolExecutor(max_workers=self.workers) + return self._thread_pool + + def register( + self, + endpoint, + callback, + pattern, + debounce=200, + delay=10, + **listener_kwargs, + ): + ''' + Register a route. To be defined by an inheriting class, typically taking a pattern + and a callback. + + Note: Listener arguments + Notice how listener_kwargs are accumulated instead of uniquely assigned to an + endpoint. This is generally acceptable as some listeners may allow various + configurations for the same endpoint. Note, however, for something like the + PathListener, this will have no effect. Registering the same endpoint multiple + times won't cause any errors, but the configuration options will only remain + for the last registered group. + + (Update) The above remark about PathListener's is no longer, and likely never + was. Varying flag sets under the same endpoint do in fact have a cumulative + effect, and we need to be able disentangle events accordingly through + submitted event's `action` value. + + Parameters: + pattern: hashable object to be used when filtering event (passed to inherited + `filter(...)`) + callback: callable accepting an event to be executed if when a matching event + is received + ''' + route_tuple = (callback, pattern, debounce, delay, listener_kwargs) + self.routemap[endpoint].append(route_tuple) + + def submit(self, events, callbacks=None): + ''' + Handle a list of events. Each event is matched against the registered callbacks, + and those callbacks are ran concurrently (be it via a thread pool or an asyncio + loop). + ''' + if type(events) is not list: + events = [events] + + futures = [] + for event in events: + future = self.submit_callback(self.submit_event, event, callbacks=callbacks) + future.add_done_callback(lambda f: self.clear_event(event, f)) + futures.append(future) + + return futures + + def submit_event(self, event, callbacks=None): + ''' + Group up and submit all matching callbacks for `event`. All callbacks are ran + concurrently in their own threads, and this method blocks until all are completed. + + In the outer `submit` context, this blocking method is itself ran in its own + thread, and the registered post-callbacks are attached to the completion of this + function, i.e., the finishing of all callbacks matching provided event. + + Note that there are no checks for empty callback lists, where we could exit early. + Here we simply rely on methods doing the right thing: `wait_on_futures` would + simply receive an empty list, for example. Nevertheless, once an event is + submitted with this method, it gets at least a few moments where that event is + considered "running," and will be later popped out by `clear_events` (almost + immediately if there is in fact nothing to do). An early exit would simply have to + come after indexing the event in `running_events` + ''' + if callbacks is None: + # ensure same thread gets all matching routes & sets debounce updates; else + # this may be split across threads mid-check, preventing one thread from + # handling the blocking of the entire group + with self._route_lock: + callbacks = self.matching_routes(event) + + # stop early if no work to do + if len(callbacks) == 0: + return [] + + # enqueue requested/matched callbacks and exit if running + event_idx = self.event_index(event) + if event_idx in self.running_events: + self.queue_callbacks(event_idx, callbacks) + return [] + + # callbacks now computed, flush the running event + # note: a separate thread could queue valid callbacks since the running check; + # o/w we know the running index is empty + self.running_events[event_idx] = self.running_events[event_idx] + + # submit matching callbacks and wait for them to complete + future_results = self.wait_on_callbacks(callbacks, event) + + # finally call post event-group callbacks (only if some event callbacks were + # submitted), wait for them to complete + if future_results: + self.wait_on_futures([ + self.submit_callback(post_callback, event, future_results) + for post_callback in self.post_callbacks + ]) + + return future_results + + def submit_callback(self, callback, *args, **kwargs): + ''' + Note: this method is expected to return a future. Perform any event-based + filtering before submitting a callback with this method. + ''' + if inspect.iscoroutinefunction(callback): + if self.loop is None: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + #loop.run_in_executor(executor, loop.create_task, callback(event)) + #future = self.loop.call_soon_threadsafe( + # self.loop.create_task, + future = asyncio.run_coroutine_threadsafe( + callback(*args, **kwargs), + self.loop, + ) + else: + future = self.thread_pool.submit( + callback, *args, **kwargs + ) + future.add_done_callback(handle_exception) + + return future + + def matching_routes(self, event, event_time=None): + ''' + Return eligible matching routes for the provided event. + + Note that we wait as late as possible before enqueuing matches if the event is in + fact already active in a frame. If this method were start filtering results while + the frame is active, and the frame were to finish before all matching callbacks + were determined, we would be perfectly happy to return all matches, and allow the + outer `submit_event` context to run them right away in a newly constructed frame. + The _very_ next thing that gets done is adding this event to the active event + tracker. Otherwise, matching is performed as usual, and eligible callbacks are + simply enqueued for the next event frame, which will be checked in the "done" + callback of the active frame. The logic here should mostly "seal up" any real + opportunities for error, e.g., a frame ending and popping off elements from + `running_events` half-way through their inserting at the end of this method, or + multiple threads checking for matching routes for the same event, and both coming + away with a non-empty set of matches to run. That last example highlights + precisely how the single event-frame model works: many threads might be running + this method at the same time, for the same event (which has fired rapidly), but + only one should be able to "secure the frame" and begin running the matching + callbacks. Making the "active frame check" both as late as possible and as close + to the event blocking stage in the tracker (in `submit_event`), we make the + ambiguity gap as small as possible (and almost certainly smaller than any + realistic I/O-bound event duplication). + + Note: on event actions + The debounce reset is now only set if the event is successfully filtered. This + allows some middle ground when trying to depend on event actions: if the + action passes through, we block the whole range of actions until the debounce + window completes. Otherwise, the event remains open, only to be blocked by the + debounce on the first matching action. + ''' + matches = [] + endpoint = event.endpoint + name = event.name + #action = tuple(event.action) # should be more general + event_time = time.time()*1000 if event_time is None else event_time + + for (callback, pattern, debounce, delay, listen_kwargs) in self.routemap[endpoint]: + #index = (endpoint, name, action, callback, pattern, debounce, delay) + index = (endpoint, name, callback, pattern, debounce, delay) + + if event_time < self.next_allowed_time[index]: + # reject event + continue + + if self.filter(event, pattern, **listen_kwargs): + # note that delayed callbacks are added + matches.append(self.get_delayed_callback(callback, delay, index)) + + # set next debounce + self.next_allowed_time[index] = event_time + debounce + + match_text = Style.BRIGHT + Fore.GREEN + 'matched' + + callback_name = str(callback) + if hasattr(callback, '__name__'): + callback_name = callback.__name__ + + logger.info( + f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]' + ) + else: + match_text = Style.BRIGHT + Fore.RED + 'rejected' + logger.debug( + f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]' + ) + + return matches + + def get_delayed_callback(self, callback, delay, index): + ''' + Parameters: + callback: function to wrap + delay: delay in ms + ''' + if index not in self.callback_registry: + async def async_wrap(callback, *args, **kwargs): + await asyncio.sleep(delay/1000) + return await callback(*args, **kwargs) + + def sync_wrap(callback, *args, **kwargs): + time.sleep(delay/1000) + return callback(*args, **kwargs) + + wrapper = None + if inspect.iscoroutinefunction(callback): wrapper = async_wrap + else: wrapper = sync_wrap + + self.callback_registry[index] = partial(wrapper, callback) + + return self.callback_registry[index] + + def wait_on_futures(self, futures): + ''' + Block until all futures in `futures` are complete. Return collected results as a + list, and log warnings when a future fails. + ''' + future_results = [] + for future in as_completed(futures): + try: + future_results.append(future.result()) + except Exception as e: + logger.warning(f"Router callback job failed with exception {e}") + + return future_results + + def wait_on_callbacks(self, callbacks, event, *args, **kwargs): + ''' + Overridable by inheriting classes based on callback structure + ''' + return self.wait_on_futures([ + self.submit_callback(callback, event, *args, **kwargs) + for callback in callbacks + ]) + + def queue_callbacks(self, event_idx, callbacks): + ''' + Overridable by inheriting classes based on callback structure + ''' + self.running_events[event_idx].update(callbacks) + + def filter(self, event, pattern, **listen_kwargs) -> bool: + ''' + Parameters: + listen_kwargs_list: + ''' + raise NotImplementedError + + def add_post_callback(self, callback: Callable): + self.post_callbacks.append(callback) + + def get_listener(self, listener_cls=None): + ''' + Create a new Listener to manage watched routes and their callbacks. + ''' + if listener_cls is None: + listener_cls = self.listener_cls + if listener_cls is None: + raise ValueError('No Listener class provided') + + listener = listener_cls(self) + return self.extend_listener(listener) + + def extend_listener(self, listener): + ''' + Extend a provided Listener object with the Router instance's `listener_kwargs`. + ''' + for endpoint, route_tuples in self.routemap.items(): + for route_tuple in route_tuples: + listen_kwargs = route_tuple[-1] + listener.listen(endpoint, **listen_kwargs) + return listener + + def stop_event(self, event): + ''' + Pop event out of the running events tracker and return it. + ''' + event_idx = self.event_index(event) + return self.running_events.pop(event_idx, None) + + def clear_event(self, event, future): + ''' + Clear an event. Pops the passed event out of `running_events`, and the request + counter is >0, the event is re-submitted. + + This method is attached as a "done" callback to the main event wrapping job + `submit_event`. The `future` given to this method is one to which it was + attached as this "done" callback. This method should only be called when that + `future` is finished running (or failed). If any jobs were submitted in the + wrapper task, the future results here should be non-empty. We use this fact to + filter out non-work threads that call this method. Because even the + `matching_routes` check is threaded, we can't wait to see an event has no work to + schedule, and thus can't prevent this method being attached as a "done" callback. + The check for results from the passed future allows us to know when in fact a + valid frame has finished, and a resubmission may be on the table. + ''' + if not future.result(): return + queued_callbacks = self.stop_event(event) + + # resubmit event if some queued work + if queued_callbacks and len(queued_callbacks) > 0: + logger.debug( + f'Event [{event.name}] resubmitted with [{len(queued_callbacks)}] queued callbacks' + ) + self.submit(event, callbacks=queued_callbacks) + + def event_index(self, event): + return event[:2] + + +class ChainRouter(Router): + ''' + Routes events to registered callbacks + ''' + def __init__(self, ordered_routers): + super().__init__() + + self.ordered_routers = [] + for router in ordered_routers: + self.add_router(router) + + self.running_events = defaultdict(lambda: defaultdict(set)) + + def add_router(self, router): + ''' + TODO: allow positional insertion in ordered list + + Note: the `routemap` extensions here shouldn't be necessary, since 1) route maps + show up only in `matching_routes`, and 2) `matching_routes` is only invoked in + `submit_event`, which is totally overwritten for the ChainRouter type. All events + are routed through to individual Routers, and which point their route maps are + used. + ''' + self.ordered_routers.append(router) + for endpoint, routelist in router.routemap.items(): + self.routemap[endpoint].extend(routelist) + + def matching_routes(self, event, event_time=None): + ''' + Colloquial `callbacks` now used as a dict of lists of callbacks, indexed by + router, and only having keys for routers with non-empty callback lists. + ''' + if event_time is None: + event_time = time.time()*1000 + + route_map = {} + for router in self.ordered_routers: + router_matches = router.matching_routes(event, event_time) + if router_matches: + route_map[router] = router_matches + + return route_map + + def wait_on_callbacks(self, callbacks, event, *args, **kwargs): + ''' + Note: relies on order of callbacks dict matching that of `ordered_routers`, which + should happen in `matching_routes` + ''' + results = {} + for router, callback_list in callbacks.items(): + router_results = router.submit_event(event, callbacks=callback_list) + results[router] = router_results + + return results + + def queue_callbacks(self, event_idx, callbacks): + for router, callback_list in callbacks.items(): + self.running_events[event_idx][router].update(callback_list) + + def stop_event(self, event): + ''' + Sub-routers do not get a "done" callback for their `submit_event` jobs, as they + would if they handled their own event submissions. They will, however, set the + submitted event as "running." We can't rely on sub-routers' "done" callbacks to + "unset" the running event, because the disconnect between the thread completing + and execution of that callback may take too long. + + Instead, we explicitly unset the running event for each of the constituent + sub-routers at the _same time_ we handle the ChainRouter's notion of event's + ending. + ''' + event_idx = self.event_index(event) + for router in self.ordered_routers: + rq_callbacks = router.running_events.pop(event_idx, []) + assert len(rq_callbacks) == 0 + + return self.running_events.pop(event_idx, None) + + def get_listener(self, listener_cls=None): + if listener_cls is None: + for router in self.ordered_routers: + if router.listener_cls is not None: + listener_cls = router.listener_cls + break + + listener = super().get_listener(listener_cls) + for router in self.ordered_routers: + router.extend_listener(listener) + return listener + + +def handle_exception(future): + try: + future.result() + except Exception as e: + print(f"Exception occurred: {e}") + traceback.print_exc() diff --git a/execlog/routers/__init__.py b/execlog/routers/__init__.py new file mode 100644 index 0000000..3ad6969 --- /dev/null +++ b/execlog/routers/__init__.py @@ -0,0 +1 @@ +from execlog.routers.path import PathRouter diff --git a/execlog/routers/path.py b/execlog/routers/path.py new file mode 100644 index 0000000..480e082 --- /dev/null +++ b/execlog/routers/path.py @@ -0,0 +1,90 @@ +import logging +from pathlib import Path + +from execlog.router import Router +from execlog.listeners.path import PathListener +from execlog.util.path import glob_match + + +logger = logging.getLogger(__name__) + +class PathRouter(Router): + def __init__(self, loop=None, workers=None, listener_cls=PathListener): + ''' + Parameters: + workers: number of workers to assign the thread pool when the event loop is + started. Defaults to `None`, which, when passed to + ThreadPoolExecutor, will by default use 5x the number of available + processors on the machine (which the docs claim is a reasonable + assumption given threads are more commonly leveraged for I/O work + rather than intense CPU operations). Given the intended context for + this class, this assumption aligns appropriately. + ''' + super().__init__(loop=loop, workers=workers, listener_cls=listener_cls) + + def register( + self, + path, + func, + glob='**/!(.*|*.tmp|*~)', # recursive, non-temp + debounce=200, + delay=30, + **listener_kwargs, + ): + ''' + Parameters: + path: Path (directory) to watch with `inotify` + func: Callback to run if FS event target matches glob + glob: Relative glob pattern to match files in provided path. The FS event's + filename must match this pattern for the callback to queued. (Default: + "*"; matching all files in path). + listener_kwargs: Additional params for associated listener "listen" routes. + See `PathListener.listen`. + ''' + super().register( + #endpoint=Path(path), + endpoint=path, + callback=func, + pattern=glob, + debounce=debounce, + delay=delay, + **listener_kwargs + ) + + def filter(self, event, glob, **listen_kwargs) -> bool: + ''' + Note: + If `handle_events` is called externally, note that this loop will block in the + calling thread until the jobs have been submitted. It will _not_ block until + jobs have completed, however, as a list of futures is returned. The calling + Watcher instance may have already been started, in which case `run()` will + already be executing in a separate thread. Calling this method externally will + not interfere with this loop insofar as it adds jobs to the same thread pool. + + Because this method only submits jobs associated with the provided `events`, + the calling thread can await the returned list of futures and be confident + that top-level callbacks associated with these file events have completed. Do + note that, if the Watcher has already been started, any propagating file + events will be picked up and possibly process simultaneously (although their + associated callbacks will have nothing to do with the return list of futures). + + Parameters: + event : Event instance + glob : Single string or tuple of glob patterns to check against event endpoint + ''' + not_tmp_glob = '**/!(.*|*.tmp|*~)' + if not glob_match(Path(event.name), not_tmp_glob): + return False + + listen_flags = listen_kwargs.get('flags') + # only filter by flags if explicitly specified on registry + # (o/w route likely just wanting to use defaults) + if listen_flags is not None: + # negative filter if action not one of the listened flags + if not any(flag & listen_flags for flag in event.action): + logger.debug( + f'Event [{event.name}] caught in flag filter under [{glob}] for action [{event.action}]' + ) + return False + + return glob_match(Path(event.name), glob) diff --git a/execlog/server.py b/execlog/server.py new file mode 100644 index 0000000..8fbb027 --- /dev/null +++ b/execlog/server.py @@ -0,0 +1,204 @@ +import re +import asyncio +import logging +import threading +from functools import partial + +import uvicorn +from fastapi import FastAPI, WebSocket +from fastapi.staticfiles import StaticFiles +from inotify_simple import flags + +from execlog.handler import Handler as LREndpoint + + +logger = logging.getLogger(__name__) + +class Server: + ''' + Server class. Wraps up a development static file server and live reloader. + ''' + def __init__( + self, + host, + port, + root, + static : bool = False, + livereload : bool = False, + managed_listeners : list | None = None, + ): + ''' + Parameters: + managed_listeners: auxiliary listeners to "attach" to the server process, and to + propagate the shutdown signal to when the server receives an + interrupt. + ''' + self.host = host + self.port = port + self.root = root + self.static = static + self.livereload = livereload + + if managed_listeners is None: + managed_listeners = [] + self.managed_listeners = managed_listeners + + self.listener = None + self.server = None + self.server_text = '' + self.server_args = {} + + self.loop = None + self._server_setup() + + def _wrap_static(self): + self.server.mount("/", StaticFiles(directory=self.root), name="static") + + def _wrap_livereload(self): + self.server.websocket_route('/livereload')(LREndpoint) + #self.server.add_api_websocket_route('/livereload', LREndpoint) + + def _server_setup(self): + ''' + Set up the FastAPI server. Only a single server instance is used here, optionally + mounting the static route (if static serving enabled) and providing a websocket + endpoint (if livereload enabled). + + Note that, when present, the livereload endpoint is registered first, as the order + in which routes are defined matters for FastAPI apps. This allows `/livereload` to + behave appropriately, despite the root re-mount that takes place if serving static + files. + ''' + # enable propagation and clear handlers for uvicorn internal loggers; + # allows logging messages to propagate to my root logger + log_config = uvicorn.config.LOGGING_CONFIG + log_config['loggers']['uvicorn']['propagate'] = True + log_config['loggers']['uvicorn']['handlers'] = [] + log_config['loggers']['uvicorn.access']['propagate'] = True + log_config['loggers']['uvicorn.access']['handlers'] = [] + log_config['loggers']['uvicorn.error']['propagate'] = False + log_config['loggers']['uvicorn.error']['handlers'] = [] + + self.server_args['log_config'] = log_config + self.server_args['host'] = self.host + self.server_args['port'] = self.port + + if self.static or self.livereload: + self.server = FastAPI() + self.server.on_event('shutdown')(self.shutdown) + + if self.livereload: + self._wrap_livereload() + self._listener_setup() + self.server_text += '+reload' + + if self.static: + self._wrap_static() + self.server_text += '+static' + + def _listener_setup(self): + ''' + flags.MODIFY okay since we don't need to reload non-existent pages + ''' + from localsys.reloader.router import PathRouter + + if self.loop is None: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + #self.listener = listener.WatchFS(loop=self.loop) + self.router = PathRouter(loop=self.loop) + self.router.register( + path=str(self.root), + func=LREndpoint.reload_clients, + delay=100, + flags=flags.MODIFY, + ) + + self.listener = self.router.get_listener() + self.managed_listeners.append(self.listener) + + def start(self): + ''' + Start the server. + + Note: Design + This method takes on some extra complexity in order to ensure the blocking + Watcher and FastAPI's event loop play nicely together. The Watcher's `start()` + method runs a blocking call to INotify's `read()`, which obviously cannot be + started directly here in the main thread. Here we have a few viable options: + + 1. Simply wrap the Watcher's `start` call in a separate thread, e.g., + + ```py + watcher_start = partial(self.watcher.start, loop=loop) + threading.Thread(target=self.watcher.start, kwargs={'loop': loop}).start() + ``` + + This works just fine, and the watcher's registered async callbacks can + still use the passed event loop to get messages sent back to open WebSocket + clients. + 2. Run the Watcher's `start` inside a thread managed by event loop via + `loop.run_in_executor`: + + ```py + loop.run_in_executor(None, partial(self.watcher.start, loop=loop)) + ``` + + Given that this just runs the target method in a separate thread, it's very + similar to option #1. It doesn't even make the outer loop context available + to the Watcher, meaning we still have to pass this loop explicitly to the + `start` method. The only benefit here (I think? there may actually be no + difference) is that it keeps things under one loop, which can be beneficial + for shutdown. + + See related discussions: + + - https://stackoverflow.com/questions/55027940/is-run-in-executor-optimized-for-running-in-a-loop-with-coroutines + - https://stackoverflow.com/questions/70459437/how-gil-affects-python-asyncio-run-in-executor-with-i-o-bound-tasks + + Once the watcher is started, we can kick off the FastAPI server (which may be + serving static files, handling livereload WS connections, or both). We + provide `uvicorn` access to the manually created `asyncio` loop used to the + run the Watcher (in a thread, that is), since that loop is made available to + the `Watcher._event_loop` method. This ultimately allows async methods to be + registered as callbacks to the Watcher and be ran in a managed loop. In this + case, that loop is managed by FastAPI, which keeps things consistent: the + Watcher can call `loop.call_soon_threadsafe` to queue up a FastAPI-based + response _in the same FastAPI event loop_, despite the trigger for that + response having originated from a separate thread (i.e., where the watcher is + started). This works smoothly, and keeps the primary server's event loop from + being blocked. + + Note that, due to the delicate Watcher behavior, we must perform a shutdown + explicitly in order for things to be handled gracefully. This is done in the + server setup step, where we ensure FastAPI calls `watcher.stop()` during its + shutdown process. + ''' + if self.loop is None: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + for listener in self.managed_listeners: + #loop.run_in_executor(None, partial(self.listener.start, loop=loop)) + if not listener.started: + listener.start() + + if self.server: + logger.info(f'Server{self.server_text} @ http://{self.host}:{self.port}') + + uconfig = uvicorn.Config(app=self.server, loop=self.loop, **self.server_args) + userver = uvicorn.Server(config=uconfig) + self.loop.run_until_complete(userver.serve()) + + def shutdown(self): + ''' + Additional shutdown handling after the FastAPI event loop receives an interrupt. + + Currently this + ''' + logger.info("Shutting down server...") + + # stop attached auxiliary listeners, both internal & external + for listener in self.managed_listeners: + listener.stop() diff --git a/execlog/util/__init__.py b/execlog/util/__init__.py new file mode 100644 index 0000000..fac37f2 --- /dev/null +++ b/execlog/util/__init__.py @@ -0,0 +1 @@ +from execlog.util import path diff --git a/execlog/util/path.py b/execlog/util/path.py new file mode 100644 index 0000000..574a713 --- /dev/null +++ b/execlog/util/path.py @@ -0,0 +1,68 @@ +import re +import inspect +from glob import glob +from pathlib import Path + +from wcmatch import glob as wc_glob + + +camel2snake_regex = re.compile(r'(?, 'R1 ::')]\n", + "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", + "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n" + ] + }, + { + "data": { + "text/plain": [ + "[,\n", + " ,\n", + " ]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "R1 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", + "R1 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", + "R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", + "R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", + "R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", + "R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", + "R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" + ] + } + ], + "source": [ + "# multi-event single router\n", + "router1.submit(events)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "092a21a7-8052-4826-911c-6e4423b075ce", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", + "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", + "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" + ] + }, + { + "data": { + "text/plain": [ + "[,\n", + " ,\n", + " ]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", + "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", + "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n", + "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", + "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", + "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "R2 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", + "R2 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", + "R3 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", + "R3 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", + "R2 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", + "R3 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", + "R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", + "R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", + "R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" + ] + } + ], + "source": [ + "chain_router.submit(events)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "52f83d9a-7b12-4891-8b90-986a9ed399d7", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.listeners.path:Starting listener for 1 paths\n", + "INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [, , , , , ]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "defaultdict(. at 0x78be3b5db100>, {1: defaultdict(, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n" + ] + } + ], + "source": [ + "listener = chain_router.get_listener()\n", + "listener.start()\n", + "\n", + "print(listener.watchmap)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "00bb2889-f266-4fb1-9a89-7d7539aba9cf", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n", + "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n", + "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n" + ] + } + ], + "source": [ + "file_a = Path('endpoint_proxy/fileA')\n", + "file_a.write_text('test text')\n", + "file_a.unlink()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4e993450-bdb7-4860-ba23-dbc2e5676ace", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "execlog", + "language": "python", + "name": "execlog" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e605e99 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[project] +name = "execlog" +version = "0.1.1" +authors = [ + { name="Sam Griesemer", email="samgriesemer@gmail.com" }, +] +description = "Lightweight multi-threaded job framework" +readme = "README.md" +requires-python = ">=3.12" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] +dependencies = [ + "tqdm" +] + +[tool.setuptools.packages.find] +include = ["execlog*"] # pattern to match package names + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..01bae86 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,19 @@ +# -- package-specific -- +fastapi +starlette +uvicorn +inotify_simple +tqdm +wcmatch + +# -- logging -- +colorama + +# -- sphinx docs -- +sphinx +sphinx-togglebutton +furo +myst-parser + +# -- testing --- +pytest