From f1e0b5602b41f6da0109ef8ad0cfe68e7ede805e Mon Sep 17 00:00:00 2001 From: "Sam G." Date: Sat, 27 Apr 2024 16:33:08 -0700 Subject: [PATCH] add Event typing, clean up docstrings, add initial tests --- README.md | 7 ++++- execlog/__init__.py | 1 + execlog/event.py | 14 +++++++++ execlog/handler.py | 9 ++++++ execlog/listener.py | 24 +++++++++------ execlog/listeners/path.py | 40 ++++++------------------ execlog/router.py | 41 ++++++++++++++----------- execlog/routers/path.py | 33 ++++++++------------ execlog/server.py | 33 ++++++++++++++++---- execlog/util/__init__.py | 1 + execlog/util/generic.py | 64 +++++++++++++++++++++++++++++++++++++++ execlog/util/path.py | 4 +++ tests/test_handler.py | 0 tests/test_listener.py | 0 tests/test_router.py | 0 tests/test_server.py | 0 16 files changed, 186 insertions(+), 85 deletions(-) create mode 100644 execlog/event.py create mode 100644 execlog/util/generic.py create mode 100644 tests/test_handler.py create mode 100644 tests/test_listener.py create mode 100644 tests/test_router.py create mode 100644 tests/test_server.py diff --git a/README.md b/README.md index 73174c3..96ccac4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,7 @@ # Overview -`execlog` is a package +`execlog` is a lightweight multi-threaded job framework + +- **Handler**: live-reload handshake manager for connecting pages +- **Listener**: +- **Router** +- **Server**: diff --git a/execlog/__init__.py b/execlog/__init__.py index 442d884..c3723ac 100644 --- a/execlog/__init__.py +++ b/execlog/__init__.py @@ -2,6 +2,7 @@ from execlog.handler import Handler from execlog.listener import Listener from execlog.router import Router, ChainRouter, Event from execlog.server import Server +from execlog.event import Event, FileEvent from execlog import listeners from execlog import routers diff --git a/execlog/event.py b/execlog/event.py new file mode 100644 index 0000000..2bdad80 --- /dev/null +++ b/execlog/event.py @@ -0,0 +1,14 @@ +from collections import namedtuple + + +Event = namedtuple( + 'Event', + ['endpoint', 'name', 'action'], + defaults=[None, None, None], +) + +FileEvent = namedtuple( + 'FileEvent', + ['endpoint', 'name', 'action'], + defaults=[None, None, None], +) diff --git a/execlog/handler.py b/execlog/handler.py index f958c76..6782042 100644 --- a/execlog/handler.py +++ b/execlog/handler.py @@ -1,3 +1,12 @@ +''' +Handler + +Websocket endpoint subclass intended to route websocket connections in a Server context. + +Note: the current Handler class is very specific, tailored entirely to handling a +supported live-reload handshake. This should likely be made more general, but until +separate handshakes or endpoints are needed, it's fine as is. +''' import re import logging from pathlib import Path diff --git a/execlog/listener.py b/execlog/listener.py index 53ca1f3..ae8e6bd 100644 --- a/execlog/listener.py +++ b/execlog/listener.py @@ -2,29 +2,33 @@ Implements a file system watcher. See also: - - https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read + +- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read ''' import threading +from execlog.event import Event -class Listener(threading.Thread): - def __init__(self, router): + +class Listener[E: Event](threading.Thread): + def __init__(self, router: 'Router[E]'): ''' Parameters: - workers: number of workers to assign the thread pool when the event loop is - started. Defaults to `None`, which, when passed to - ThreadPoolExecutor, will by default use 5x the number of available - processors on the machine (which the docs claim is a reasonable - assumption given threads are more commonly leveraged for I/O work - rather than intense CPU operations). Given the intended context for - this class, this assumption aligns appropriately. + router: associated Router instance that events should be passed to ''' super().__init__() self.router = router def listen(self): + ''' + Register a new listener endpoint + ''' raise NotImplementedError def run(self): + ''' + Begin listening for events. Typically a blocking loop that passes events to + attached Router. + ''' raise NotImplementedError diff --git a/execlog/listeners/path.py b/execlog/listeners/path.py index 19a9a52..f328074 100644 --- a/execlog/listeners/path.py +++ b/execlog/listeners/path.py @@ -1,11 +1,3 @@ -''' -Implements a file system watcher. - -See also: - -- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read -''' -#import fnmatch import os import time import select @@ -15,27 +7,18 @@ from collections import defaultdict from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks -from execlog import util +from execlog import util +from execlog.event import FileEvent from execlog.listener import Listener logger = logging.getLogger(__name__) -# hardcoded file names to ignore -# - "4913" is a temp file created by Vim before editing -IGNORE_PATTERNS = ['4913', '.sync*.db*'] - -class PathListener(Listener): +class PathListener(Listener[FileEvent]): def __init__(self, router): ''' Parameters: - workers: number of workers to assign the thread pool when the event loop is - started. Defaults to `None`, which, when passed to - ThreadPoolExecutor, will by default use 5x the number of available - processors on the machine (which the docs claim is a reasonable - assumption given threads are more commonly leveraged for I/O work - rather than intense CPU operations). Given the intended context for - this class, this assumption aligns appropriately. + router: associated Router instance that events should be passed to Note: Due to the nature of INotify, you cannot watch the same path with two @@ -60,7 +43,7 @@ class PathListener(Listener): self.inotify = INotify() self.read_fd, write_fd = os.pipe() - self.write = os.fdopen(write_fd, "wb") + self.write = os.fdopen(write_fd, 'wb') def _add_watch( self, @@ -128,14 +111,13 @@ class PathListener(Listener): flags=None, ): ''' + Listen to file events occurring under a provided path, optionally only events + matching provided iNotify flags. + Parameters: path: Path (directory) to watch with `inotify` flags: inotify_simple flags matching FS event types allowed to trigger the callback - debounce: time in milliseconds to debounce file-based events at this path. - Applies to _file targets_; the same filename will have events - "debounced" at this interval (time delta calculated from last - un-rejected match). ''' path = Path(path) @@ -159,10 +141,6 @@ class PathListener(Listener): `start()` is a blocking call. This will hog your main thread if not properly threaded. If handling this manually in your outer context, you will also need to make sure to call `.stop()` - - Parameters: - loop: asyncio loop to pass to `_event_loop`; used to schedule async callbacks - when present ''' self.started = True logger.info(f'Starting listener for {len(self.watchmap)} paths') @@ -332,7 +310,7 @@ class PathListener(Listener): for event in events: # hard coded ignores - if util.path.glob_match(event.name, IGNORE_PATTERNS): continue + if util.path.glob_match(event.name, util.path.IGNORE_PATTERNS): continue mask_flags = iflags.from_mask(event.mask) diff --git a/execlog/router.py b/execlog/router.py index 068346d..1cb22ed 100644 --- a/execlog/router.py +++ b/execlog/router.py @@ -1,3 +1,6 @@ +''' +Router +''' import time import asyncio import logging @@ -8,21 +11,18 @@ from pathlib import Path from typing import Callable from functools import partial from colorama import Fore, Style -from collections import namedtuple, defaultdict +from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, wait, as_completed from tqdm.auto import tqdm +from execlog.event import Event +from execlog.listener import Listener + + logger = logging.getLogger(__name__) - -Event = namedtuple( - 'Event', - ['endpoint', 'name', 'action'], - defaults=[None, None, None], -) - -class Router: +class Router[E: Event]: ''' Route events to registered callbacks @@ -74,16 +74,22 @@ class Router: earlier ones, or interact with intermediate disk states (raw file writes, DB inserts, etc), before the earliest call has had a chance to clean up. ''' - def __init__(self, loop=None, workers=None, listener_cls=None): + listener_cls = Listener[E] + + def __init__(self, loop=None, workers=None): ''' Parameters: loop: - workers: - listener_cls: + workers: number of workers to assign the thread pool when the event loop is + started. Defaults to `None`, which, when passed to + ThreadPoolExecutor, will by default use 5x the number of available + processors on the machine (which the docs claim is a reasonable + assumption given threads are more commonly leveraged for I/O work + rather than intense CPU operations). Given the intended context for + this class, this assumption aligns appropriately. ''' self.loop = loop self.workers = workers - self.listener_cls = listener_cls self.routemap : dict[str, list[tuple]] = defaultdict(list) self.post_callbacks = [] @@ -141,7 +147,7 @@ class Router: route_tuple = (callback, pattern, debounce, delay, listener_kwargs) self.routemap[endpoint].append(route_tuple) - def submit(self, events, callbacks=None): + def submit(self, events:E | list[E], callbacks=None): ''' Handle a list of events. Each event is matched against the registered callbacks, and those callbacks are ran concurrently (be it via a thread pool or an asyncio @@ -235,7 +241,7 @@ class Router: return future - def matching_routes(self, event, event_time=None): + def matching_routes(self, event: E, event_time=None): ''' Return eligible matching routes for the provided event. @@ -288,7 +294,7 @@ class Router: # set next debounce self.next_allowed_time[index] = event_time + debounce - match_text = Style.BRIGHT + Fore.GREEN + 'matched' + match_text = Style.BRIGHT + Fore.GREEN + 'matched' + Fore.RESET callback_name = str(callback) if hasattr(callback, '__name__'): @@ -298,7 +304,7 @@ class Router: f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]' ) else: - match_text = Style.BRIGHT + Fore.RED + 'rejected' + match_text = Style.BRIGHT + Fore.RED + 'rejected' + Fore.RESET logger.debug( f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]' ) @@ -373,6 +379,7 @@ class Router: ''' if listener_cls is None: listener_cls = self.listener_cls + if listener_cls is None: raise ValueError('No Listener class provided') diff --git a/execlog/routers/path.py b/execlog/routers/path.py index 480e082..be7fc02 100644 --- a/execlog/routers/path.py +++ b/execlog/routers/path.py @@ -1,34 +1,25 @@ import logging from pathlib import Path +from typing import Callable from execlog.router import Router -from execlog.listeners.path import PathListener +from execlog.event import FileEvent from execlog.util.path import glob_match +from execlog.listeners.path import PathListener logger = logging.getLogger(__name__) -class PathRouter(Router): - def __init__(self, loop=None, workers=None, listener_cls=PathListener): - ''' - Parameters: - workers: number of workers to assign the thread pool when the event loop is - started. Defaults to `None`, which, when passed to - ThreadPoolExecutor, will by default use 5x the number of available - processors on the machine (which the docs claim is a reasonable - assumption given threads are more commonly leveraged for I/O work - rather than intense CPU operations). Given the intended context for - this class, this assumption aligns appropriately. - ''' - super().__init__(loop=loop, workers=workers, listener_cls=listener_cls) - +class PathRouter(Router[FileEvent]): + listener_cls = PathListener + def register( self, - path, - func, - glob='**/!(.*|*.tmp|*~)', # recursive, non-temp - debounce=200, - delay=30, + path : Path, + func : Callable, + glob : str = '**/!(.*|*.tmp|*~)', # recursive, non-temp + debounce : int|float = 200, + delay : int|float = 30, **listener_kwargs, ): ''' @@ -38,6 +29,8 @@ class PathRouter(Router): glob: Relative glob pattern to match files in provided path. The FS event's filename must match this pattern for the callback to queued. (Default: "*"; matching all files in path). + debounce: + delay: listener_kwargs: Additional params for associated listener "listen" routes. See `PathListener.listen`. ''' diff --git a/execlog/server.py b/execlog/server.py index 8fbb027..19a9cd3 100644 --- a/execlog/server.py +++ b/execlog/server.py @@ -1,3 +1,17 @@ +''' +Server + +Central management object for both file serving systems (static server, live reloading) +and job execution (routing and listening). Routers and Listeners can be started and +managed independently, but a single Server instance can house, start, and shutdown +listeners in one place. + +TODO: as it stands, the Server requires address and port details, effectively needing one +of the HTTP items (static file serving or livereloading) to be initialized appropriately. +But there is a clear use case for just managing disparate Routers and their associated +Listeners. Should perhaps separate this "grouped listener" into another object, or just +make the Server definition more flexible. +''' import re import asyncio import logging @@ -5,9 +19,9 @@ import threading from functools import partial import uvicorn +from inotify_simple import flags from fastapi import FastAPI, WebSocket from fastapi.staticfiles import StaticFiles -from inotify_simple import flags from execlog.handler import Handler as LREndpoint @@ -29,9 +43,16 @@ class Server: ): ''' Parameters: + host: host server address (either 0.0.0.0, 127.0.0.1, localhost) + port: port at which to start the server + root: base path for static files _and_ where router bases are attached (i.e., + when files at this path change, a reload event will be + propagated to a corresponding client page) + static: whether or not to start a static file server + livereload: whether or not to start a livereload server managed_listeners: auxiliary listeners to "attach" to the server process, and to - propagate the shutdown signal to when the server receives an - interrupt. + propagate the shutdown signal to when the server receives an + interrupt. ''' self.host = host self.port = port @@ -66,8 +87,8 @@ class Server: Note that, when present, the livereload endpoint is registered first, as the order in which routes are defined matters for FastAPI apps. This allows `/livereload` to - behave appropriately, despite the root re-mount that takes place if serving static - files. + behave appropriately, even when remounting the root if serving static files + (which, if done in the opposite order, would "eat up" the `/livereload` endpoint). ''' # enable propagation and clear handlers for uvicorn internal loggers; # allows logging messages to propagate to my root logger @@ -100,7 +121,7 @@ class Server: ''' flags.MODIFY okay since we don't need to reload non-existent pages ''' - from localsys.reloader.router import PathRouter + from execlog.reloader.router import PathRouter if self.loop is None: self.loop = asyncio.new_event_loop() diff --git a/execlog/util/__init__.py b/execlog/util/__init__.py index fac37f2..5f3addb 100644 --- a/execlog/util/__init__.py +++ b/execlog/util/__init__.py @@ -1 +1,2 @@ from execlog.util import path +from execlog.util import generic diff --git a/execlog/util/generic.py b/execlog/util/generic.py new file mode 100644 index 0000000..b307c0a --- /dev/null +++ b/execlog/util/generic.py @@ -0,0 +1,64 @@ +import logging + +import tqdm +import colorama +from colorama import Fore, Back, Style + + +class ColorFormatter(logging.Formatter): + _format = '%(levelname)-8s :: %(name)s %(message)s' + colorama.init(autoreset=True) + + FORMATS = { + 'x': Fore.YELLOW + _format, + 'listener': Fore.GREEN + _format, + 'handler': Fore.CYAN + _format, + 'server': Style.DIM + Fore.CYAN + _format, + 'router': Fore.MAGENTA + _format, + 'site': Fore.BLUE + _format, + 'utils': Style.DIM + Fore.WHITE + _format, + } + FORMATS = { k:logging.Formatter(v) for k,v in FORMATS.items() } + DEFAULT_LOGGER = logging.Formatter(_format) + + def format(self, record): + # color by high-level submodule + name_parts = record.name.split('.') + package = ''.join(name_parts[:1]) + submodule = ''.join(name_parts[1:2]) + subsubmodule = '.'.join(name_parts[1:3]) + + formatter = self.DEFAULT_LOGGER + if subsubmodule in self.FORMATS: + formatter = self.FORMATS[subsubmodule] + elif submodule in self.FORMATS: + formatter = self.FORMATS[submodule] + + name = record.name + if package == 'localsys': + name = f'localsys.{subsubmodule}' + + limit = 26 + name = name[:limit] + name = f'[{name}]{"-"*(limit-len(name))}' + record.name = name + + return formatter.format(record) + +class TqdmLoggingHandler(logging.StreamHandler): + def __init__(self, level=logging.NOTSET): + super().__init__(level) + formatter = ColorFormatter() + self.setFormatter(formatter) + + def emit(self, record): + try: + msg = self.format(record) + #tqdm.tqdm.write(msg) + tqdm.tqdm.write(msg, end=self.terminator) + self.flush() + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + diff --git a/execlog/util/path.py b/execlog/util/path.py index 574a713..2403c8d 100644 --- a/execlog/util/path.py +++ b/execlog/util/path.py @@ -6,6 +6,10 @@ from pathlib import Path from wcmatch import glob as wc_glob +# hardcoded file names to ignore +# - "4913" is a temp file created by Vim before editing +IGNORE_PATTERNS = ['4913', '.sync*.db*'] + camel2snake_regex = re.compile(r'(?