initial commit (public reset)

This commit is contained in:
Sam G. 2024-04-19 18:43:52 -07:00
commit 109fbefd41
29 changed files with 2131 additions and 0 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# generic py
__pycache__/
.pytest_cache/
localsys.egg-info/
.ipynb_checkpoints/
.python-version
# vendor and build files
dist/
build/
docs/_autoref/
docs/_autosummary/
docs/_build/

0
MANIFEST.in Normal file
View File

20
Makefile Normal file
View File

@ -0,0 +1,20 @@
PYTHON=/home/smgr/.pyenv/versions/execlog/bin/python
BASH=/usr/bin/bash
## ------------------ docs ------------------ ##
docs-build:
sphinx-apidoc --module-first --separate -o docs/_autoref/ execlog
make -C docs/ html
docs-serve:
cd docs/_build/html && python -m http.server 9090
docs-clean:
make -C docs/ clean
rm -rf docs/_autoref
## ------------------------------------------ ##
## ----------------- tests ------------------ ##
test:
pytest --pyargs tests -v

2
README.md Normal file
View File

@ -0,0 +1,2 @@
# Overview
`execlog` is a package

20
docs/Makefile Normal file
View File

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

38
docs/conf.py Normal file
View File

@ -0,0 +1,38 @@
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = 'execlog'
copyright = '2024, Sam Griesemer'
author = 'Sam Griesemer'
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.autosummary",
"sphinx.ext.viewcode",
"myst_parser",
]
autosummary_generate = True
autosummary_imported_members = True
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'furo'
html_static_path = ['_static']
#html_sidebars = {
# '**': ['/modules.html'],
#}

32
docs/index.md Normal file
View File

@ -0,0 +1,32 @@
# `execlog` package docs
{ref}`genindex`
{ref}`modindex`
{ref}`search`
```{eval-rst}
.. autosummary::
:nosignatures:
execlog.Handler
execlog.Listener
execlog.Router
execlog.Server
```
```{toctree}
:maxdepth: 3
:caption: Autoref
_autoref/execlog.rst
```
```{toctree}
:maxdepth: 3
:caption: Contents
reference/documentation/index
```
```{include} ../README.md
```

35
docs/make.bat Normal file
View File

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View File

@ -0,0 +1,8 @@
# Documentation
```{toctree}
:hidden:
sphinx
```

View File

@ -0,0 +1,111 @@
# Sphinx
The primary driver of this package's documentation is Sphinx's `autodoc` extension,
using the [Furo theme][1].
**High-level details**:
- `sphinx-apidoc` generates package-based documentation to the `_autoref/` directory,
with navigation available under "Autoref" in the sidebar.
- Markdown-based documentation files are manually written under the `reference/`
directory, showing up under "Contents" in the sidebar.
## Detailed directory structure
All files are placed under `docs/sphinx`:
- `_`-prefixed are Sphinx-managed directories
* `_build/html/` houses output HTML files
* `_autoref/` is the target for module-based RST files written by `autodoc`
- `reference/`: houses all manually written documentation (totally separate from
auto-generated package docs)
- `conf.py`: single Sphinx configuration file
- `index.md`: documentation index, setups up a persistent sidebar across all other pages
For manually written documentation under `reference/`, topics are nested as needed. Within
a nested directory `reference/<topic>`, an `index.md` should created with content like:
```
# <Topic>
\`\`\`{toctree}
:hidden:
sub-topic-1.rst
sub-topic-2.rst
...
\`\`\`
```
This will add the nested directory to the sidebar navigation, using the name set under the
top-level header. See [Markdown syntax][#markdown-syntax] for more details on the syntax.
## Sphinx autodoc
Sphinx's `autodoc` extension allows automatic generation of documents according to
(Python) subpackage structure and available docstrings. A few notes here:
- In the `conf.py` file, autodoc is enabled by adding `"sphinx.ext.autodoc"` to
the extensions list. `"sphinx.ext.viewcode"` can also be added to provide
links to source code.
- Documents are actually generated by calling the `sphinx-apidoc` CLI command. The
current Makefile uses the following call:
```sh
sphinx-apidoc --module-first -o docs/sphinx/_autoref/ localsys
```
This writes the automatically generated docs for modules in the package at the
local directory `localsys/` to the `docs/sphinx/_autoref` directory. These are
reStructuredText files by default.
* `--module-first` places the module-level descriptions at the top of the module page.
By default, this is placed at the bottom (oddly), and can be obscured by large lists
of subpackages if this flag isn't provided.
* See available `sphinx-apidoc` options [here][2], as well as more advanced config
[here][3].
## Markdown syntax
The `myst_parser` extension enables Markdown (or something close to it) to be used when
writing documentation files. The Sphinx directives can be difficult to track, and
they change slightly under the MyST Markdown syntax. The following are a few common
blocks:
**Page hierarchies**: the following will generate link hierarchy according to the provided
pages:
```
\`\`\`{toctree}
:maxdepth: <n>
:caption: <caption>
:hidden:
example-file-1
example-file-2
example-dir/index
...
\`\`\`
```
- `:maxdepth:` limits the depth of nesting
- `:caption:` title for the group of pages
- `:hidden:` if provided, links will only show in the sidebar (hidden on the page)
- Constituent files: listed files will be rendered as a link directly. If a listed file
has a `{toctree}` directive, this tree will be rendered in place of the page's link as a
dropdown. The dropdown will be named according to the file's top-level heading, and
clicking directly on the dropdown header will show that page's content. Files found in
the tree will be placed as links under the dropdown, recursively subject to same rules
described here.
**Include files**: the following will include file content
pages:
```
\`\`\`{include} README.md
\`\`\`
```
**Reference directives**
[1]: https://pradyunsg.me/furo/
[2]: https://www.sphinx-doc.org/en/master/man/sphinx-apidoc.html
[3]: https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#

14
execlog.egg-info/PKG-INFO Normal file
View File

@ -0,0 +1,14 @@
Metadata-Version: 2.1
Name: execlog
Version: 0.1.1
Summary: Lightweight multi-threaded job framework
Author-email: Sam Griesemer <samgriesemer@gmail.com>
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: tqdm
# Overview
`execlog` is a package

View File

@ -0,0 +1,19 @@
MANIFEST.in
README.md
pyproject.toml
execlog/__init__.py
execlog/handler.py
execlog/listener.py
execlog/router.py
execlog/server.py
execlog.egg-info/PKG-INFO
execlog.egg-info/SOURCES.txt
execlog.egg-info/dependency_links.txt
execlog.egg-info/requires.txt
execlog.egg-info/top_level.txt
execlog/listeners/__init__.py
execlog/listeners/path.py
execlog/routers/__init__.py
execlog/routers/path.py
execlog/util/__init__.py
execlog/util/path.py

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@
tqdm

View File

@ -0,0 +1 @@
execlog

8
execlog/__init__.py Normal file
View File

@ -0,0 +1,8 @@
from execlog.handler import Handler
from execlog.listener import Listener
from execlog.router import Router, ChainRouter, Event
from execlog.server import Server
from execlog import listeners
from execlog import routers
from execlog import util

150
execlog/handler.py Normal file
View File

@ -0,0 +1,150 @@
import re
import logging
from pathlib import Path
from inotify_simple import flags
from starlette.endpoints import WebSocketEndpoint
logger = logging.getLogger(__name__)
#page_re = re.compile(r'https?:\/\/.*?\/(.*?)(?:\?|\.html|$)')
page_re = re.compile(r'https?:\/\/.*?\/(.*?)$')
def client_reload_wrap(reloaded_file):
rpath = Path(reloaded_file)
static_extensions = ['.js', '.css']
if rpath.suffix in static_extensions:
return lambda _: True
else:
return lambda c: Path(c).with_suffix('.html') == rpath
class Handler(WebSocketEndpoint):
'''
Subclasses WebSocketEndpoint to be attached to live reload endpoints.
Note: Reload model
- Served HTML files are generated from templates that include livereload JS and the
target livereload server (port manually set prior to site build).
- When pages are visited (be they served from NGINX or via the development
server), the livereload.js attempts to connect to the known livereload WS
endpoint.
- FastAPI routes the request to _this_ endpoint, and `on_connect` is called.
- Upon successful connection, the livereload JS client sends a "hello" message.
This is picked up as the first post-acceptance message, and captured by the
`on_receive` method.
- `on_receive` subsequently initiates a formal handshake, sending back a "hello"
command and waiting the "info" command from the client.
- If the "info" command is received successfully and contains the requesting
page's URL, the handshake completes and the websocket is added to the class'
`live_clients` tracker.
- Later, when a file in a watch path of the server's watcher is _modified_,
`reload_clients` will be called from within the originating server's event loop,
and pass in the FS event associated with the file change. `client_reload_wrap`
is used to wrap a boolean checker method for whether or not to reload clients
given the FS event.
TODO: flesh out the reload wrapper to incorporate more complex filters and/or
transformations when determining when to reload certain clients.
'''
encoding = 'json'
live_clients = {}
async def on_connect(self, websocket):
await websocket.accept()
async def on_receive(self, websocket, data):
'''
Note: On page names
When websockets connect, they simply communicate the exact URL from the origin
page. The websocket is then indexed to possibly variable page names (often
without an `.html` suffix, but occasionally with). The `client_reload_wrap` is
then responsible for taking this client page name and normalizing it to be
matched with full file names (i.e., suffix always included).
'''
url = await self._lr_handshake(websocket, data)
if url is None:
logger.warning('Client handshake failed, ignoring')
return
origin_m = page_re.search(url)
if origin_m is not None:
origin_page = origin_m.group(1)
# assume index.html if connected to empty name
if origin_page == '':
origin_page = 'index.html'
else:
origin_page = '<unidentified>.null'
self.live_clients[origin_page] = websocket
logger.info(f'Reloader connected to [{origin_page}] ({len(self.live_clients)} live clients)')
async def on_disconnect(self, websocket, close_code):
remove_page = None
for page, ws in self.live_clients.items():
if ws == websocket:
remove_page = page
if remove_page is not None:
logger.info(f'Client for [{remove_page}] disconnected, removing')
self.live_clients.pop(remove_page)
@classmethod
async def reload_clients(cls, event):
'''
Method targeted as a watcher callback. This async method is scheduled in a
thread-safe manner by the watcher to be ran in the FastAPI event loop.
'''
logger.info(f'> [{event.name}] changed on disk')
should_reload = client_reload_wrap(event.name)
for page, ws in cls.live_clients.items():
if should_reload(page):
logger.info(f'>> Reloading client for [{page}]')
await ws.send_json({
'command' : 'reload',
'path' : page,
'liveCSS' : True,
'liveImg' : True,
})
@staticmethod
async def _lr_handshake(websocket, hello):
'''
Handshake with livereload.js
1. client send 'hello'
2. server reply 'hello'
3. client send 'info'
'''
# 1. await client hello after accept
#hello = await websocket.receive_json()
if hello.get('command') != 'hello':
logger.warning('Client handshake failed at "hello" stage')
return
# 2. send hello to client
await websocket.send_json({
'command': 'hello',
'protocols': [
'http://livereload.com/protocols/official-7',
],
'serverName': 'livereload-tornado',
})
# 3. await info response
info = await websocket.receive_json()
if info.get('command') != 'info':
logger.warning('Client handshake failed at "info" stage')
return None
elif 'url' not in info:
logger.warning('Info received from client, but no URL provided')
return None
return info['url']

30
execlog/listener.py Normal file
View File

@ -0,0 +1,30 @@
'''
Implements a file system watcher.
See also:
- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read
'''
import threading
class Listener(threading.Thread):
def __init__(self, router):
'''
Parameters:
workers: number of workers to assign the thread pool when the event loop is
started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work
rather than intense CPU operations). Given the intended context for
this class, this assumption aligns appropriately.
'''
super().__init__()
self.router = router
def listen(self):
raise NotImplementedError
def run(self):
raise NotImplementedError

View File

@ -0,0 +1 @@
from execlog.listeners.path import PathListener

388
execlog/listeners/path.py Normal file
View File

@ -0,0 +1,388 @@
'''
Implements a file system watcher.
See also:
- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read
'''
#import fnmatch
import os
import time
import select
import logging
from pathlib import Path
from collections import defaultdict
from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks
from execlog import util
from execlog.listener import Listener
logger = logging.getLogger(__name__)
# hardcoded file names to ignore
# - "4913" is a temp file created by Vim before editing
IGNORE_PATTERNS = ['4913', '.sync*.db*']
class PathListener(Listener):
def __init__(self, router):
'''
Parameters:
workers: number of workers to assign the thread pool when the event loop is
started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work
rather than intense CPU operations). Given the intended context for
this class, this assumption aligns appropriately.
Note:
Due to the nature of INotify, you cannot watch the same path with two
separate flag settings (without creating a new INotify instance). Under the
same instance, calling `add_watch` for an already watched path location will
simply return the watch descriptor already associated with that location (and
may update the flags to whatever is passed). However, this location will only
ever be "watched once" by a given INotify instance, so keep this in mind if
multiple downstream callbacks (packaged up by the Router) want to react to the
same path but with different flags (i.e., it won't work as expected).
'''
super().__init__(router)
self.started = False
self.pathmap : dict[str, int] = {}
self.canonmap : dict[int, tuple] = {}
self.watchmap : dict[int, dict[tuple, int]] = defaultdict(lambda: defaultdict(int))
self.unmatched_move_froms : dict[int, dict[str, str]] = defaultdict(dict)
self.inotify = INotify()
self.read_fd, write_fd = os.pipe()
self.write = os.fdopen(write_fd, "wb")
def _add_watch(
self,
path,
flags,
lead=None,
remove=False,
):
if lead is None: lead = ''
path = Path(path)
lead = Path(lead)
wd = None
fullpath = Path(path, lead)
try:
wd = self.inotify.add_watch(str(fullpath), flags)
self.watchmap[wd][(path, lead)] |= flags
self.pathmap[fullpath] = wd
except FileNotFoundError:
logger.error(f'Directory not found for [{fullpath}] when attempting to watch')
return wd
def _add_watch_recursive(
self,
path,
flags,
lead=None,
remove=False,
):
'''
Recursively watch directories under path/lead, using `path` as the registered
base. Specifying `lead` gives one control over the subdirectory on which to
recurse; the "origin" the recursion base point.
Note: on renamed/moved directories
This method is used to reset the mapped path/lead tuples pointed to by certain
watch descriptors when a directory is renamed. iNotify will fire a MOVED event
for the directory (MOVED_FROM/MOVED_TO), but will use the same watch
descriptor for the new directory as it did the old. This will leave all watch
dynamics intact, but new file events will fire and send the old base path to
the router. Explicitly re-watching the renamed directory (and any
subdirectories) will also return that existing watch descriptor. Thus, this
method can just be directly called for directory moves/renames, and WDs in the
`watchmap` will just be used as expected. (One should also call the method
using the old lead and set `remove=True` to remove old tuples out of the
`watchmap`. Note that this doesn't remove the existing watches from iNotify,
just their tracked tuples.)
'''
if lead is None:
lead = ''
wds = []
origin = Path(path, lead)
for subdir in [origin, *util.path.iter_glob_paths('**/', origin)]:
lead = subdir.relative_to(Path(path))
wd = self._add_watch(path, flags, lead=lead, remove=remove)
if wd is not None:
wds.append(wd)
return wds
def listen(
self,
path,
flags=None,
):
'''
Parameters:
path: Path (directory) to watch with `inotify`
flags: inotify_simple flags matching FS event types allowed to trigger the
callback
debounce: time in milliseconds to debounce file-based events at this path.
Applies to _file targets_; the same filename will have events
"debounced" at this interval (time delta calculated from last
un-rejected match).
'''
path = Path(path)
if flags is None:
flags = iflags.CREATE | iflags.DELETE | iflags.MODIFY | iflags.DELETE_SELF | iflags.MOVED_TO
# ensure flags can detect directory events
flags |= iflags.CREATE | iflags.MOVED_FROM | iflags.MOVED_TO
wds = self._add_watch_recursive(path, flags)
try:
self.canonmap[wds[0]] = (path, flags)
except IndexError:
logger.error(f'Path {path} returned no INotify watch descriptors')
raise
def run(self):
'''
Note: On usage
`start()` is a blocking call. This will hog your main thread if not properly
threaded. If handling this manually in your outer context, you will also need
to make sure to call `.stop()`
Parameters:
loop: asyncio loop to pass to `_event_loop`; used to schedule async callbacks
when present
'''
self.started = True
logger.info(f'Starting listener for {len(self.watchmap)} paths')
for path, flags in self.canonmap.values():
logger.info(f'> Listening on path {path} for flags {iflags.from_mask(flags)}')
for (callback, pattern, debounce, delay, *_) in self.router.routemap[path]:
logger.info(f'| > {pattern} -> {callback.__name__} (debounce {debounce}ms, delay {delay}ms)')
while True:
rlist, _, _ = select.select(
[self.inotify.fileno(), self.read_fd], [], []
)
if self.inotify.fileno() in rlist:
events = self.inotify.read(timeout=0)
self.handle_events(events)
# check for written stop byte
if self.read_fd in rlist:
os.close(self.read_fd)
self.inotify.close()
return
def update_moved_from(self, path, lead):
'''
Update directories on `MOVED_FROM` events. This method gets the existing WD,
removes the old path associated with that WD from the `watchmap` (preventing
events originating from this old path when the new path, which has the _same WD_,
receives an inotify event), and queues the (WD, base-path) tuple to be matched
later in a `MOVED_TO` handler.
This method isn't a part of a `MOVED_TO` handler because it may be called without
ever having a `MOVED_TO` that follows up. We respond right away in `handle_events`
to `MOVED_FROM` events, keeping the `watchmap` in sync, regardless of whether we
can expect a `MOVED_TO` to sweep through after the fact.
Note that the `lead` is unique for a given WD and base path. WDs are unique for
filepaths, but inotify uses the same WD for new directories when they experience a
rename (it's the same inode). However, during such a transition, the `watchmap`
can see two different entries for the same WD and basepath: the old tracked path,
and the newly named one (again, still the same inode). So: this method can be
called 1) directly from `MOVED_FROM` events, preemptively wiping the old path from
the tracked dicts, or 2) during handling of a `MOVED_TO` event (in case we don't
allow `MOVED_FROM` events, for instance), given both the new and old paths can be
seen in the `watchmap`.
'''
wd = self.pathmap.get(Path(path, lead))
logger.debug(f'> MOVED_FROM update, [{Path(path, lead)}] in pathmap as [{wd}]')
if wd is None: return
if self.watchmap[wd].pop((path, lead), None):
logger.debug(f'> MOVED_FROM update, popped from watchmap')
self.unmatched_move_froms[wd][path] = lead
def update_moved_to(self, path, lead):
'''
Construct synthetic MOVED events. Events are constructed from the path's WD. If
the provided path is not watched, an empty list of events is returned.
Note: Design details
This method is nuanced. It can only be called once a `MOVED_TO` occurs, since
we can't act on a `MOVED_FROM` (we don't know the new target location to look
so we can send file events). When called, we first look for the path's WD in
the `pathmap`. We then check if this WD points to more than one entry with the
same base path (WDs are unique to the path; under the same WD, the same base
path implies the same lead). If so, we know one is the outdated path, and we
push the outdated lead to `update_moved_from`. This would be evidence that the
`MOVED_FROM` event for the move operation wasn't handled in the main event
handling loop. We then check for unmatched move-froms, which should provide
any renamed directories, regardless of whether `MOVED_FROM`s were allowed, to
be detected. Finally, the appropriate `MOVED_FROM`s and `MOVED_TO`s are
handled. To ensure only the correct events match upon handling, we do the
following:
- First, if a `MOVED_FROM` path is not available, we assume it wasn't queued
by the event and not a watched flag. Given we by default ensure MOVED events
are tracked, regardless of listened paths, this shouldn't be possible, but
if this standard were to change, we won't recursively respond to
`MOVED_FROM`s. This will mean that we can't prevent events from being
matched to old directory names (we've rooted out the ability to tell when
they've changed), and thus can't remove them from the `watchpath`
accordingly. (One functional caveat here: this MOVED_TO handling method
explicitly calls `updated_moved_from`, which should clean up lingering
renamed path targets. This happens recursively if we're watching MOVED_TOs,
so even if standards do change and you don't watch `MOVED_FROM`s, you'll
still get clean up for free due to the robustness of this method.
- If a `MOVED_FROM` lead is found, either due to an inferred matching base
lingering in the `watchmap` or through previously handled `MOVED_FROM`
response, add this path/lead back to the `watchmap`, remove the new
path/lead, and call `handle_events` for the synthetic `MOVED_FROM` events
across files and directories. Once finished, again remove the old path/lead
and add back the new one.
- Submit `MOVED_TO` events to `handle_events`. This will recursively propagate for
subdirectories, each submitting their own `update_moved_to` call, resetting
its own outdated leads and changing them back, all the way down to the
bottom.
In the odd case where `MOVED_FROM` is registered but not `MOVED_TO`, you will
simply remove the directory causing a `MOVED_FROM` event, with no recursive
propagation. This should likely be changed.
'''
fullpath = Path(path, lead)
wd = self.pathmap.get(fullpath)
if wd is None:
logger.debug(f'Directory [{fullpath}] moved, but is not watched, ignoring')
return []
# inspect registered paths with same WD -- looking for same base path but diff lead
# will be empty if explicitly handled by a MOVED_FROM -- else inferred from watchmap
matching_bases = [pl for pl in self.watchmap[wd] if pl[0] == path and pl[1] != lead]
# there should be at most one of these, but handle iteratively
for matching_base, old_lead in matching_bases:
self.update_moved_from(matching_base, old_lead)
# explicit queries for files & dirs faster (tested) than filtering a single query
# using `Path.is_dir`; handle files, then subdirectories
moved_from_events = []
moved_to_events = []
for file in util.path.iter_glob_paths('*', fullpath, no_dir=True):
moved_from_events.append(iEvent(wd=wd, mask=iflags.MOVED_FROM, cookie=0, name=file.name))
moved_to_events.append(iEvent(wd=wd, mask=iflags.MOVED_TO, cookie=0, name=file.name))
for subdir in util.path.iter_glob_paths('*/', fullpath):
moved_from_mask = iflags.MOVED_FROM | iflags.ISDIR
moved_from_events.append(iEvent(wd=wd, mask=moved_from_mask, cookie=0, name=subdir.name))
moved_to_mask = iflags.MOVED_TO | iflags.ISDIR
moved_to_events.append(iEvent(wd=wd, mask=moved_to_mask, cookie=0, name=subdir.name))
# check for unmatched moved froms -- should be enqueued in event loop or just above
moved_from_lead = self.unmatched_move_froms.get(wd, {}).pop(path, None)
if moved_from_lead is None:
logger.debug(f'Couldn\'t find MOVED_FROM origin, just yielding MOVED_TO events')
else:
# temporarily remove new path, add old path to allow MOVED_FROMs to seep through
flags = self.watchmap[wd].pop((path, lead)) # remove new
self.watchmap[wd][(path, moved_from_lead)] = flags # add old
self.handle_events(moved_from_events)
self.watchmap[wd].pop((path, moved_from_lead)) # remove old
self.watchmap[wd][(path, lead)] = flags # add back new
self.handle_events(moved_to_events)
def handle_events(self, events):
'''
Note:
If `handle_events` is called externally, note that this loop will block in the
calling thread until the jobs have been submitted. It will _not_ block until
jobs have completed, however, as a list of futures is returned. The calling
Watcher instance may have already been started, in which case `run()` will
already be executing in a separate thread. Calling this method externally will
not interfere with this loop insofar as it adds jobs to the same thread pool.
Because this method only submits jobs associated with the provided `events`,
the calling thread can await the returned list of futures and be confident
that top-level callbacks associated with these file events have completed. Do
note that, if the Watcher has already been started, any propagating file
events will be picked up and possibly processed simultaneously (although their
associated callbacks will have nothing to do with the returned list of futures).
'''
from execlog.router import Event
for event in events:
# hard coded ignores
if util.path.glob_match(event.name, IGNORE_PATTERNS): continue
mask_flags = iflags.from_mask(event.mask)
if event.wd not in self.watchmap:
raise ValueError(f'Watcher fired for untracked descriptor origin: {event}')
moved_froms = []
moved_tos = []
for (path, lead), flags in self.watchmap[event.wd].items():
relpath = Path(lead, event.name)
abspath = Path(path, relpath)
# add new directories
if iflags.ISDIR in mask_flags:
if iflags.CREATE in mask_flags:
logger.debug(f'New directory detected [{relpath}]')
self._add_watch_recursive(path, flags, lead=relpath)
if iflags.MOVED_FROM in mask_flags:
moved_froms.append((path, relpath))
if iflags.MOVED_TO in mask_flags:
moved_tos.append((path, relpath))
continue
logger.debug(f'Watcher fired for [{relpath}]: {mask_flags}')
route_event = Event(endpoint=str(path), name=str(relpath), action=mask_flags)
self.router.submit(route_event)
# handle renamed directories; old dir was being watched if these flags
# match. The same WD is used by iNotify for the new dir, so
# recursively update explicitly stored paths.
for path, lead in moved_froms:
logger.debug(f'Directory moved, removing old [{lead}]')
self.update_moved_from(path, lead)
for path, lead in moved_tos:
logger.debug(f'Directory moved, adding new [{lead}]')
self._add_watch(path, flags, lead=lead)
self.update_moved_to(path, lead)
def stop(self):
logger.info("Stopping listener...")
if self.router.thread_pool is not None:
self.router.thread_pool.shutdown()
# request INotify stop by writing in the pipe, checked in watch loop
if not self.write.closed:
self.write.write(b"\x00")
self.write.close()

525
execlog/router.py Normal file
View File

@ -0,0 +1,525 @@
import time
import asyncio
import logging
import inspect
import traceback
import threading
from pathlib import Path
from typing import Callable
from functools import partial
from colorama import Fore, Style
from collections import namedtuple, defaultdict
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from tqdm.auto import tqdm
logger = logging.getLogger(__name__)
Event = namedtuple(
'Event',
['endpoint', 'name', 'action'],
defaults=[None, None, None],
)
class Router:
'''
Route events to registered callbacks
Generalized registration includes an endpoint (the origin of an event), a pattern (to
filter events at the endpoint), and a callback (to be executed if pattern is matched).
The Router _routes_ events to affiliated callbacks in a multi-threaded fashion. A
thread pool handles these jobs as events are submitted, typically by a composing
Listener. The Listener "hears" an event, and passes it on through to a Router to
further filter and delegate any matching follow-up jobs.
This base Router implements most of the registry and filter model. When events are
submitted for propagation, they are checked for matching routes. Events specify an
origin endpoint, which is used as the filter for attached routes. The event is then
subjected to the `filter` method, which checks if the event matches the registered
`pattern` under the originated `endpoint`. If so, the callback is scheduled for
execution, and the matching event is passed as its sole argument.
Subclasses are expected to implement (at least) the `filter` method. This function is
responsible for wrapping up the task-specific logic needed to determine if an event,
originating from a known endpoint, matches the callback-specific pattern. This method
needn't handle any other filter logic, like checking if the event originates from the
provided endpoint, as this is already handled by the outer look in `matching_routes`.
`get_listener` is a convenience method that instantiates and populates an affiliated
Listener over the register paths found in the Router. Listeners require a Router upon
instantiation so events can be propagated to available targets when they occur.
`get_listener()` is the recommended way to attain a Listener.
Note: on debouncing events
Previously, debouncing was handled by listeners. This logic has been generalized
and moved to this class, as it's general enough to be desired across various
Listener types. We also need unique, identifying info only available with a
`(endpoint, callback, pattern)` triple in order to debounce events in accordance
with their intended target.
Note: tracking events and serializing callback frames
Although not part of the original implementation, we now track which events have a
callback chain actively being executed, and prevent the same chain from being
started concurrently. If the callback chain is actively running for an event, and
that same event is submitted before this chain finishes, the request is simply
enqueued. The `clear_event` method is attached as a "done callback" to each job
future, and will re-submit the event once the active chain finishes.
While this could be interpreted as a harsh design choice, it helps prevent many
many thread conflicts (race conditions, writing to the same resources, etc) when
the same function is executed concurrently, many times over. Without waiting
completely for an event to be fully handled, later jobs may complete before
earlier ones, or interact with intermediate disk states (raw file writes, DB
inserts, etc), before the earliest call has had a chance to clean up.
'''
def __init__(self, loop=None, workers=None, listener_cls=None):
'''
Parameters:
loop:
workers:
listener_cls:
'''
self.loop = loop
self.workers = workers
self.listener_cls = listener_cls
self.routemap : dict[str, list[tuple]] = defaultdict(list)
self.post_callbacks = []
# track running jobs by event
self.running_events = defaultdict(set)
# debounce tracker
self.next_allowed_time = defaultdict(int)
# store prepped (e.g., delayed) callbacks
self.callback_registry = {}
self._thread_pool = None
self._route_lock = threading.Lock()
@property
def thread_pool(self):
if self._thread_pool is None:
self._thread_pool = ThreadPoolExecutor(max_workers=self.workers)
return self._thread_pool
def register(
self,
endpoint,
callback,
pattern,
debounce=200,
delay=10,
**listener_kwargs,
):
'''
Register a route. To be defined by an inheriting class, typically taking a pattern
and a callback.
Note: Listener arguments
Notice how listener_kwargs are accumulated instead of uniquely assigned to an
endpoint. This is generally acceptable as some listeners may allow various
configurations for the same endpoint. Note, however, for something like the
PathListener, this will have no effect. Registering the same endpoint multiple
times won't cause any errors, but the configuration options will only remain
for the last registered group.
(Update) The above remark about PathListener's is no longer, and likely never
was. Varying flag sets under the same endpoint do in fact have a cumulative
effect, and we need to be able disentangle events accordingly through
submitted event's `action` value.
Parameters:
pattern: hashable object to be used when filtering event (passed to inherited
`filter(...)`)
callback: callable accepting an event to be executed if when a matching event
is received
'''
route_tuple = (callback, pattern, debounce, delay, listener_kwargs)
self.routemap[endpoint].append(route_tuple)
def submit(self, events, callbacks=None):
'''
Handle a list of events. Each event is matched against the registered callbacks,
and those callbacks are ran concurrently (be it via a thread pool or an asyncio
loop).
'''
if type(events) is not list:
events = [events]
futures = []
for event in events:
future = self.submit_callback(self.submit_event, event, callbacks=callbacks)
future.add_done_callback(lambda f: self.clear_event(event, f))
futures.append(future)
return futures
def submit_event(self, event, callbacks=None):
'''
Group up and submit all matching callbacks for `event`. All callbacks are ran
concurrently in their own threads, and this method blocks until all are completed.
In the outer `submit` context, this blocking method is itself ran in its own
thread, and the registered post-callbacks are attached to the completion of this
function, i.e., the finishing of all callbacks matching provided event.
Note that there are no checks for empty callback lists, where we could exit early.
Here we simply rely on methods doing the right thing: `wait_on_futures` would
simply receive an empty list, for example. Nevertheless, once an event is
submitted with this method, it gets at least a few moments where that event is
considered "running," and will be later popped out by `clear_events` (almost
immediately if there is in fact nothing to do). An early exit would simply have to
come after indexing the event in `running_events`
'''
if callbacks is None:
# ensure same thread gets all matching routes & sets debounce updates; else
# this may be split across threads mid-check, preventing one thread from
# handling the blocking of the entire group
with self._route_lock:
callbacks = self.matching_routes(event)
# stop early if no work to do
if len(callbacks) == 0:
return []
# enqueue requested/matched callbacks and exit if running
event_idx = self.event_index(event)
if event_idx in self.running_events:
self.queue_callbacks(event_idx, callbacks)
return []
# callbacks now computed, flush the running event
# note: a separate thread could queue valid callbacks since the running check;
# o/w we know the running index is empty
self.running_events[event_idx] = self.running_events[event_idx]
# submit matching callbacks and wait for them to complete
future_results = self.wait_on_callbacks(callbacks, event)
# finally call post event-group callbacks (only if some event callbacks were
# submitted), wait for them to complete
if future_results:
self.wait_on_futures([
self.submit_callback(post_callback, event, future_results)
for post_callback in self.post_callbacks
])
return future_results
def submit_callback(self, callback, *args, **kwargs):
'''
Note: this method is expected to return a future. Perform any event-based
filtering before submitting a callback with this method.
'''
if inspect.iscoroutinefunction(callback):
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
#loop.run_in_executor(executor, loop.create_task, callback(event))
#future = self.loop.call_soon_threadsafe(
# self.loop.create_task,
future = asyncio.run_coroutine_threadsafe(
callback(*args, **kwargs),
self.loop,
)
else:
future = self.thread_pool.submit(
callback, *args, **kwargs
)
future.add_done_callback(handle_exception)
return future
def matching_routes(self, event, event_time=None):
'''
Return eligible matching routes for the provided event.
Note that we wait as late as possible before enqueuing matches if the event is in
fact already active in a frame. If this method were start filtering results while
the frame is active, and the frame were to finish before all matching callbacks
were determined, we would be perfectly happy to return all matches, and allow the
outer `submit_event` context to run them right away in a newly constructed frame.
The _very_ next thing that gets done is adding this event to the active event
tracker. Otherwise, matching is performed as usual, and eligible callbacks are
simply enqueued for the next event frame, which will be checked in the "done"
callback of the active frame. The logic here should mostly "seal up" any real
opportunities for error, e.g., a frame ending and popping off elements from
`running_events` half-way through their inserting at the end of this method, or
multiple threads checking for matching routes for the same event, and both coming
away with a non-empty set of matches to run. That last example highlights
precisely how the single event-frame model works: many threads might be running
this method at the same time, for the same event (which has fired rapidly), but
only one should be able to "secure the frame" and begin running the matching
callbacks. Making the "active frame check" both as late as possible and as close
to the event blocking stage in the tracker (in `submit_event`), we make the
ambiguity gap as small as possible (and almost certainly smaller than any
realistic I/O-bound event duplication).
Note: on event actions
The debounce reset is now only set if the event is successfully filtered. This
allows some middle ground when trying to depend on event actions: if the
action passes through, we block the whole range of actions until the debounce
window completes. Otherwise, the event remains open, only to be blocked by the
debounce on the first matching action.
'''
matches = []
endpoint = event.endpoint
name = event.name
#action = tuple(event.action) # should be more general
event_time = time.time()*1000 if event_time is None else event_time
for (callback, pattern, debounce, delay, listen_kwargs) in self.routemap[endpoint]:
#index = (endpoint, name, action, callback, pattern, debounce, delay)
index = (endpoint, name, callback, pattern, debounce, delay)
if event_time < self.next_allowed_time[index]:
# reject event
continue
if self.filter(event, pattern, **listen_kwargs):
# note that delayed callbacks are added
matches.append(self.get_delayed_callback(callback, delay, index))
# set next debounce
self.next_allowed_time[index] = event_time + debounce
match_text = Style.BRIGHT + Fore.GREEN + 'matched'
callback_name = str(callback)
if hasattr(callback, '__name__'):
callback_name = callback.__name__
logger.info(
f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]'
)
else:
match_text = Style.BRIGHT + Fore.RED + 'rejected'
logger.debug(
f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]'
)
return matches
def get_delayed_callback(self, callback, delay, index):
'''
Parameters:
callback: function to wrap
delay: delay in ms
'''
if index not in self.callback_registry:
async def async_wrap(callback, *args, **kwargs):
await asyncio.sleep(delay/1000)
return await callback(*args, **kwargs)
def sync_wrap(callback, *args, **kwargs):
time.sleep(delay/1000)
return callback(*args, **kwargs)
wrapper = None
if inspect.iscoroutinefunction(callback): wrapper = async_wrap
else: wrapper = sync_wrap
self.callback_registry[index] = partial(wrapper, callback)
return self.callback_registry[index]
def wait_on_futures(self, futures):
'''
Block until all futures in `futures` are complete. Return collected results as a
list, and log warnings when a future fails.
'''
future_results = []
for future in as_completed(futures):
try:
future_results.append(future.result())
except Exception as e:
logger.warning(f"Router callback job failed with exception {e}")
return future_results
def wait_on_callbacks(self, callbacks, event, *args, **kwargs):
'''
Overridable by inheriting classes based on callback structure
'''
return self.wait_on_futures([
self.submit_callback(callback, event, *args, **kwargs)
for callback in callbacks
])
def queue_callbacks(self, event_idx, callbacks):
'''
Overridable by inheriting classes based on callback structure
'''
self.running_events[event_idx].update(callbacks)
def filter(self, event, pattern, **listen_kwargs) -> bool:
'''
Parameters:
listen_kwargs_list:
'''
raise NotImplementedError
def add_post_callback(self, callback: Callable):
self.post_callbacks.append(callback)
def get_listener(self, listener_cls=None):
'''
Create a new Listener to manage watched routes and their callbacks.
'''
if listener_cls is None:
listener_cls = self.listener_cls
if listener_cls is None:
raise ValueError('No Listener class provided')
listener = listener_cls(self)
return self.extend_listener(listener)
def extend_listener(self, listener):
'''
Extend a provided Listener object with the Router instance's `listener_kwargs`.
'''
for endpoint, route_tuples in self.routemap.items():
for route_tuple in route_tuples:
listen_kwargs = route_tuple[-1]
listener.listen(endpoint, **listen_kwargs)
return listener
def stop_event(self, event):
'''
Pop event out of the running events tracker and return it.
'''
event_idx = self.event_index(event)
return self.running_events.pop(event_idx, None)
def clear_event(self, event, future):
'''
Clear an event. Pops the passed event out of `running_events`, and the request
counter is >0, the event is re-submitted.
This method is attached as a "done" callback to the main event wrapping job
`submit_event`. The `future` given to this method is one to which it was
attached as this "done" callback. This method should only be called when that
`future` is finished running (or failed). If any jobs were submitted in the
wrapper task, the future results here should be non-empty. We use this fact to
filter out non-work threads that call this method. Because even the
`matching_routes` check is threaded, we can't wait to see an event has no work to
schedule, and thus can't prevent this method being attached as a "done" callback.
The check for results from the passed future allows us to know when in fact a
valid frame has finished, and a resubmission may be on the table.
'''
if not future.result(): return
queued_callbacks = self.stop_event(event)
# resubmit event if some queued work
if queued_callbacks and len(queued_callbacks) > 0:
logger.debug(
f'Event [{event.name}] resubmitted with [{len(queued_callbacks)}] queued callbacks'
)
self.submit(event, callbacks=queued_callbacks)
def event_index(self, event):
return event[:2]
class ChainRouter(Router):
'''
Routes events to registered callbacks
'''
def __init__(self, ordered_routers):
super().__init__()
self.ordered_routers = []
for router in ordered_routers:
self.add_router(router)
self.running_events = defaultdict(lambda: defaultdict(set))
def add_router(self, router):
'''
TODO: allow positional insertion in ordered list
Note: the `routemap` extensions here shouldn't be necessary, since 1) route maps
show up only in `matching_routes`, and 2) `matching_routes` is only invoked in
`submit_event`, which is totally overwritten for the ChainRouter type. All events
are routed through to individual Routers, and which point their route maps are
used.
'''
self.ordered_routers.append(router)
for endpoint, routelist in router.routemap.items():
self.routemap[endpoint].extend(routelist)
def matching_routes(self, event, event_time=None):
'''
Colloquial `callbacks` now used as a dict of lists of callbacks, indexed by
router, and only having keys for routers with non-empty callback lists.
'''
if event_time is None:
event_time = time.time()*1000
route_map = {}
for router in self.ordered_routers:
router_matches = router.matching_routes(event, event_time)
if router_matches:
route_map[router] = router_matches
return route_map
def wait_on_callbacks(self, callbacks, event, *args, **kwargs):
'''
Note: relies on order of callbacks dict matching that of `ordered_routers`, which
should happen in `matching_routes`
'''
results = {}
for router, callback_list in callbacks.items():
router_results = router.submit_event(event, callbacks=callback_list)
results[router] = router_results
return results
def queue_callbacks(self, event_idx, callbacks):
for router, callback_list in callbacks.items():
self.running_events[event_idx][router].update(callback_list)
def stop_event(self, event):
'''
Sub-routers do not get a "done" callback for their `submit_event` jobs, as they
would if they handled their own event submissions. They will, however, set the
submitted event as "running." We can't rely on sub-routers' "done" callbacks to
"unset" the running event, because the disconnect between the thread completing
and execution of that callback may take too long.
Instead, we explicitly unset the running event for each of the constituent
sub-routers at the _same time_ we handle the ChainRouter's notion of event's
ending.
'''
event_idx = self.event_index(event)
for router in self.ordered_routers:
rq_callbacks = router.running_events.pop(event_idx, [])
assert len(rq_callbacks) == 0
return self.running_events.pop(event_idx, None)
def get_listener(self, listener_cls=None):
if listener_cls is None:
for router in self.ordered_routers:
if router.listener_cls is not None:
listener_cls = router.listener_cls
break
listener = super().get_listener(listener_cls)
for router in self.ordered_routers:
router.extend_listener(listener)
return listener
def handle_exception(future):
try:
future.result()
except Exception as e:
print(f"Exception occurred: {e}")
traceback.print_exc()

View File

@ -0,0 +1 @@
from execlog.routers.path import PathRouter

90
execlog/routers/path.py Normal file
View File

@ -0,0 +1,90 @@
import logging
from pathlib import Path
from execlog.router import Router
from execlog.listeners.path import PathListener
from execlog.util.path import glob_match
logger = logging.getLogger(__name__)
class PathRouter(Router):
def __init__(self, loop=None, workers=None, listener_cls=PathListener):
'''
Parameters:
workers: number of workers to assign the thread pool when the event loop is
started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work
rather than intense CPU operations). Given the intended context for
this class, this assumption aligns appropriately.
'''
super().__init__(loop=loop, workers=workers, listener_cls=listener_cls)
def register(
self,
path,
func,
glob='**/!(.*|*.tmp|*~)', # recursive, non-temp
debounce=200,
delay=30,
**listener_kwargs,
):
'''
Parameters:
path: Path (directory) to watch with `inotify`
func: Callback to run if FS event target matches glob
glob: Relative glob pattern to match files in provided path. The FS event's
filename must match this pattern for the callback to queued. (Default:
"*"; matching all files in path).
listener_kwargs: Additional params for associated listener "listen" routes.
See `PathListener.listen`.
'''
super().register(
#endpoint=Path(path),
endpoint=path,
callback=func,
pattern=glob,
debounce=debounce,
delay=delay,
**listener_kwargs
)
def filter(self, event, glob, **listen_kwargs) -> bool:
'''
Note:
If `handle_events` is called externally, note that this loop will block in the
calling thread until the jobs have been submitted. It will _not_ block until
jobs have completed, however, as a list of futures is returned. The calling
Watcher instance may have already been started, in which case `run()` will
already be executing in a separate thread. Calling this method externally will
not interfere with this loop insofar as it adds jobs to the same thread pool.
Because this method only submits jobs associated with the provided `events`,
the calling thread can await the returned list of futures and be confident
that top-level callbacks associated with these file events have completed. Do
note that, if the Watcher has already been started, any propagating file
events will be picked up and possibly process simultaneously (although their
associated callbacks will have nothing to do with the return list of futures).
Parameters:
event : Event instance
glob : Single string or tuple of glob patterns to check against event endpoint
'''
not_tmp_glob = '**/!(.*|*.tmp|*~)'
if not glob_match(Path(event.name), not_tmp_glob):
return False
listen_flags = listen_kwargs.get('flags')
# only filter by flags if explicitly specified on registry
# (o/w route likely just wanting to use defaults)
if listen_flags is not None:
# negative filter if action not one of the listened flags
if not any(flag & listen_flags for flag in event.action):
logger.debug(
f'Event [{event.name}] caught in flag filter under [{glob}] for action [{event.action}]'
)
return False
return glob_match(Path(event.name), glob)

204
execlog/server.py Normal file
View File

@ -0,0 +1,204 @@
import re
import asyncio
import logging
import threading
from functools import partial
import uvicorn
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from inotify_simple import flags
from execlog.handler import Handler as LREndpoint
logger = logging.getLogger(__name__)
class Server:
'''
Server class. Wraps up a development static file server and live reloader.
'''
def __init__(
self,
host,
port,
root,
static : bool = False,
livereload : bool = False,
managed_listeners : list | None = None,
):
'''
Parameters:
managed_listeners: auxiliary listeners to "attach" to the server process, and to
propagate the shutdown signal to when the server receives an
interrupt.
'''
self.host = host
self.port = port
self.root = root
self.static = static
self.livereload = livereload
if managed_listeners is None:
managed_listeners = []
self.managed_listeners = managed_listeners
self.listener = None
self.server = None
self.server_text = ''
self.server_args = {}
self.loop = None
self._server_setup()
def _wrap_static(self):
self.server.mount("/", StaticFiles(directory=self.root), name="static")
def _wrap_livereload(self):
self.server.websocket_route('/livereload')(LREndpoint)
#self.server.add_api_websocket_route('/livereload', LREndpoint)
def _server_setup(self):
'''
Set up the FastAPI server. Only a single server instance is used here, optionally
mounting the static route (if static serving enabled) and providing a websocket
endpoint (if livereload enabled).
Note that, when present, the livereload endpoint is registered first, as the order
in which routes are defined matters for FastAPI apps. This allows `/livereload` to
behave appropriately, despite the root re-mount that takes place if serving static
files.
'''
# enable propagation and clear handlers for uvicorn internal loggers;
# allows logging messages to propagate to my root logger
log_config = uvicorn.config.LOGGING_CONFIG
log_config['loggers']['uvicorn']['propagate'] = True
log_config['loggers']['uvicorn']['handlers'] = []
log_config['loggers']['uvicorn.access']['propagate'] = True
log_config['loggers']['uvicorn.access']['handlers'] = []
log_config['loggers']['uvicorn.error']['propagate'] = False
log_config['loggers']['uvicorn.error']['handlers'] = []
self.server_args['log_config'] = log_config
self.server_args['host'] = self.host
self.server_args['port'] = self.port
if self.static or self.livereload:
self.server = FastAPI()
self.server.on_event('shutdown')(self.shutdown)
if self.livereload:
self._wrap_livereload()
self._listener_setup()
self.server_text += '+reload'
if self.static:
self._wrap_static()
self.server_text += '+static'
def _listener_setup(self):
'''
flags.MODIFY okay since we don't need to reload non-existent pages
'''
from localsys.reloader.router import PathRouter
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
#self.listener = listener.WatchFS(loop=self.loop)
self.router = PathRouter(loop=self.loop)
self.router.register(
path=str(self.root),
func=LREndpoint.reload_clients,
delay=100,
flags=flags.MODIFY,
)
self.listener = self.router.get_listener()
self.managed_listeners.append(self.listener)
def start(self):
'''
Start the server.
Note: Design
This method takes on some extra complexity in order to ensure the blocking
Watcher and FastAPI's event loop play nicely together. The Watcher's `start()`
method runs a blocking call to INotify's `read()`, which obviously cannot be
started directly here in the main thread. Here we have a few viable options:
1. Simply wrap the Watcher's `start` call in a separate thread, e.g.,
```py
watcher_start = partial(self.watcher.start, loop=loop)
threading.Thread(target=self.watcher.start, kwargs={'loop': loop}).start()
```
This works just fine, and the watcher's registered async callbacks can
still use the passed event loop to get messages sent back to open WebSocket
clients.
2. Run the Watcher's `start` inside a thread managed by event loop via
`loop.run_in_executor`:
```py
loop.run_in_executor(None, partial(self.watcher.start, loop=loop))
```
Given that this just runs the target method in a separate thread, it's very
similar to option #1. It doesn't even make the outer loop context available
to the Watcher, meaning we still have to pass this loop explicitly to the
`start` method. The only benefit here (I think? there may actually be no
difference) is that it keeps things under one loop, which can be beneficial
for shutdown.
See related discussions:
- https://stackoverflow.com/questions/55027940/is-run-in-executor-optimized-for-running-in-a-loop-with-coroutines
- https://stackoverflow.com/questions/70459437/how-gil-affects-python-asyncio-run-in-executor-with-i-o-bound-tasks
Once the watcher is started, we can kick off the FastAPI server (which may be
serving static files, handling livereload WS connections, or both). We
provide `uvicorn` access to the manually created `asyncio` loop used to the
run the Watcher (in a thread, that is), since that loop is made available to
the `Watcher._event_loop` method. This ultimately allows async methods to be
registered as callbacks to the Watcher and be ran in a managed loop. In this
case, that loop is managed by FastAPI, which keeps things consistent: the
Watcher can call `loop.call_soon_threadsafe` to queue up a FastAPI-based
response _in the same FastAPI event loop_, despite the trigger for that
response having originated from a separate thread (i.e., where the watcher is
started). This works smoothly, and keeps the primary server's event loop from
being blocked.
Note that, due to the delicate Watcher behavior, we must perform a shutdown
explicitly in order for things to be handled gracefully. This is done in the
server setup step, where we ensure FastAPI calls `watcher.stop()` during its
shutdown process.
'''
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
for listener in self.managed_listeners:
#loop.run_in_executor(None, partial(self.listener.start, loop=loop))
if not listener.started:
listener.start()
if self.server:
logger.info(f'Server{self.server_text} @ http://{self.host}:{self.port}')
uconfig = uvicorn.Config(app=self.server, loop=self.loop, **self.server_args)
userver = uvicorn.Server(config=uconfig)
self.loop.run_until_complete(userver.serve())
def shutdown(self):
'''
Additional shutdown handling after the FastAPI event loop receives an interrupt.
Currently this
'''
logger.info("Shutting down server...")
# stop attached auxiliary listeners, both internal & external
for listener in self.managed_listeners:
listener.stop()

1
execlog/util/__init__.py Normal file
View File

@ -0,0 +1 @@
from execlog.util import path

68
execlog/util/path.py Normal file
View File

@ -0,0 +1,68 @@
import re
import inspect
from glob import glob
from pathlib import Path
from wcmatch import glob as wc_glob
camel2snake_regex = re.compile(r'(?<!^)(?=[A-Z])')
def iter_nested_paths(path: Path, ext: str = None, no_dir=False, relative=False):
if ext is None: ext = ''
return iter_glob_paths(f'**/!(.*|*.tmp|*~)*{ext}', path, no_dir=no_dir, relative=relative)
def iter_glob_paths(_glob, path: Path, no_dir=False, relative=False):
'''
wc_glob should ignore hidden files and directories by default; `**` only matches
non-hidden directories/files, `*` only non-hidden files.
Note: Pattern quirks
- Under `wc_glob`, `**` (when GLOBSTAR enabled) will match all files and
directories, recursively. Contrast this with `Path.rglob('**')`, which matches
just directories. For `wcmatch`, use `**/` to recover the directory-only
behavior.
Note: pattern behavior
- `*`: all files and dirs, non-recursive
- `**`: all files and dirs, recursive
- `*/`: all dirs, non-recursive
- `**/`: all dirs, recursive
- All file (no dir) equivalents: either of the first two, with `no_dir=True`
'''
flags = wc_glob.GLOBSTAR | wc_glob.EXTGLOB | wc_glob.NEGATE
if no_dir:
flags |= wc_glob.NODIR
glob_list = list(map(
Path,
wc_glob.glob(_glob, root_dir=path, flags=flags)
))
if not relative:
glob_list = [Path(path, p) for p in glob_list]
return glob_list
def get_running_path():
'''
Try to get the location of the current running script. If there are issues getting
this, defaults to CWD.
'''
try:
calling_module = inspect.getmodule(inspect.stack()[-1][0])
return Path(calling_module.__file__).parent
except:
return Path().cwd()
def glob_match(filename, patterns):
'''
Convenience wrapper for wcmatch.glob.globmatch, with standard `**` support. `*` won't
match across separators here, unlike `fnmatch.fnmatch`. Returns a boolean indicating
if the filename matches at least one of the provided patterns.
'''
return wc_glob.globmatch(filename, patterns, flags=wc_glob.GLOBSTAR | wc_glob.EXTGLOB | wc_glob.NEGATE)
def camel_to_snake(text):
return camel2snake_regex.sub('_', text).lower()

306
notebooks/router.ipynb Normal file
View File

@ -0,0 +1,306 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "cd0a1da5-da7a-4181-bea3-07d02991398f",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/smgr/.pyenv/versions/execlog/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
" from .autonotebook import tqdm as notebook_tqdm\n"
]
}
],
"source": [
"import logging\n",
"from pathlib import Path\n",
"from functools import partial\n",
"\n",
"from execlog import ChainRouter, Event\n",
"from execlog.routers import PathRouter\n",
"from execlog.listeners import PathListener\n",
"\n",
"logging.basicConfig(level=logging.DEBUG)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "f8b834c2-808e-4bd0-87eb-932dc42ba390",
"metadata": {},
"outputs": [],
"source": [
"router1 = PathRouter()\n",
"router2 = PathRouter()\n",
"router3 = PathRouter()\n",
"\n",
"chain_router = ChainRouter([router1, router2, router3])"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "c5bd34bb-ce1d-4719-a77a-ee13a95c3c78",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"router1.register('endpoint_proxy', partial(print, 'R1 ::'))\n",
"router2.register('endpoint_proxy', partial(print, 'R2 ::'))\n",
"router3.register('endpoint_proxy', partial(print, 'R3 ::'))\n",
"\n",
"events = [\n",
" Event(endpoint='endpoint_proxy', name='file1'),\n",
" Event(endpoint='endpoint_proxy', name='file2'),\n",
" Event(endpoint='endpoint_proxy', name='file3'),\n",
"]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "d7354e95-b7ce-4b8a-bcc2-cb8b161b8bae",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n"
]
},
{
"data": {
"text/plain": [
"[<Future at 0x78be3b5c2cf0 state=running>,\n",
" <Future at 0x78be3b5c2ab0 state=running>,\n",
" <Future at 0x78be3b5c34a0 state=running>]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R1 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n",
"R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.MODIFY: 2>])\n",
"R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n"
]
}
],
"source": [
"# multi-event single router\n",
"router1.submit(events)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "092a21a7-8052-4826-911c-6e4423b075ce",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"data": {
"text/plain": [
"[<Future at 0x78be40f24e60 state=running>,\n",
" <Future at 0x78be3b5c3740 state=running>,\n",
" <Future at 0x78be3b5c1430 state=running>]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n",
"INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R2 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n",
"R2 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n",
"R3 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n",
"R3 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n",
"R2 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n",
"R3 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n",
"R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.MODIFY: 2>])\n",
"R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.MODIFY: 2>])\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n",
"R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n"
]
}
],
"source": [
"chain_router.submit(events)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "52f83d9a-7b12-4891-8b90-986a9ed399d7",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.listeners.path:Starting listener for 1 paths\n",
"INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [<flags.MODIFY: 2>, <flags.MOVED_FROM: 64>, <flags.MOVED_TO: 128>, <flags.CREATE: 256>, <flags.DELETE: 512>, <flags.DELETE_SELF: 1024>]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"defaultdict(<function PathListener.__init__.<locals>.<lambda> at 0x78be3b5db100>, {1: defaultdict(<class 'int'>, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n"
]
}
],
"source": [
"listener = chain_router.get_listener()\n",
"listener.start()\n",
"\n",
"print(listener.watchmap)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "00bb2889-f266-4fb1-9a89-7d7539aba9cf",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.CREATE: 256>]\n",
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.MODIFY: 2>]\n",
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.DELETE: 512>]\n"
]
}
],
"source": [
"file_a = Path('endpoint_proxy/fileA')\n",
"file_a.write_text('test text')\n",
"file_a.unlink()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4e993450-bdb7-4860-ba23-dbc2e5676ace",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "execlog",
"language": "python",
"name": "execlog"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

25
pyproject.toml Normal file
View File

@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "execlog"
version = "0.1.1"
authors = [
{ name="Sam Griesemer", email="samgriesemer@gmail.com" },
]
description = "Lightweight multi-threaded job framework"
readme = "README.md"
requires-python = ">=3.12"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"tqdm"
]
[tool.setuptools.packages.find]
include = ["execlog*"] # pattern to match package names

19
requirements.txt Normal file
View File

@ -0,0 +1,19 @@
# -- package-specific --
fastapi
starlette
uvicorn
inotify_simple
tqdm
wcmatch
# -- logging --
colorama
# -- sphinx docs --
sphinx
sphinx-togglebutton
furo
myst-parser
# -- testing ---
pytest