add Event typing, clean up docstrings, add initial tests

This commit is contained in:
Sam G. 2024-04-27 16:33:08 -07:00
parent 109fbefd41
commit 6c335c2a9a
21 changed files with 424 additions and 166 deletions

View File

@ -1,2 +1,7 @@
# Overview # Overview
`execlog` is a package `execlog` is a lightweight multi-threaded job framework
- **Handler**: live-reload handshake manager for connecting pages
- **Listener**:
- **Router**
- **Server**:

View File

@ -11,4 +11,9 @@ Description-Content-Type: text/markdown
Requires-Dist: tqdm Requires-Dist: tqdm
# Overview # Overview
`execlog` is a package `execlog` is a lightweight multi-threaded job framework
- **Handler**: live-reload handshake manager for connecting pages
- **Listener**:
- **Router**
- **Server**:

View File

@ -2,6 +2,7 @@ MANIFEST.in
README.md README.md
pyproject.toml pyproject.toml
execlog/__init__.py execlog/__init__.py
execlog/event.py
execlog/handler.py execlog/handler.py
execlog/listener.py execlog/listener.py
execlog/router.py execlog/router.py
@ -16,4 +17,9 @@ execlog/listeners/path.py
execlog/routers/__init__.py execlog/routers/__init__.py
execlog/routers/path.py execlog/routers/path.py
execlog/util/__init__.py execlog/util/__init__.py
execlog/util/generic.py
execlog/util/path.py execlog/util/path.py
tests/test_handler.py
tests/test_listener.py
tests/test_router.py
tests/test_server.py

View File

@ -2,6 +2,7 @@ from execlog.handler import Handler
from execlog.listener import Listener from execlog.listener import Listener
from execlog.router import Router, ChainRouter, Event from execlog.router import Router, ChainRouter, Event
from execlog.server import Server from execlog.server import Server
from execlog.event import Event, FileEvent
from execlog import listeners from execlog import listeners
from execlog import routers from execlog import routers

14
execlog/event.py Normal file
View File

@ -0,0 +1,14 @@
from collections import namedtuple
Event = namedtuple(
'Event',
['endpoint', 'name', 'action'],
defaults=[None, None, None],
)
FileEvent = namedtuple(
'FileEvent',
['endpoint', 'name', 'action'],
defaults=[None, None, None],
)

View File

@ -1,3 +1,12 @@
'''
Handler
Websocket endpoint subclass intended to route websocket connections in a Server context.
Note: the current Handler class is very specific, tailored entirely to handling a
supported live-reload handshake. This should likely be made more general, but until
separate handshakes or endpoints are needed, it's fine as is.
'''
import re import re
import logging import logging
from pathlib import Path from pathlib import Path

View File

@ -2,29 +2,33 @@
Implements a file system watcher. Implements a file system watcher.
See also: See also:
- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read - https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read
''' '''
import threading import threading
from execlog.event import Event
class Listener(threading.Thread):
def __init__(self, router): class Listener[E: Event](threading.Thread):
def __init__(self, router: 'Router[E]'):
''' '''
Parameters: Parameters:
workers: number of workers to assign the thread pool when the event loop is router: associated Router instance that events should be passed to
started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work
rather than intense CPU operations). Given the intended context for
this class, this assumption aligns appropriately.
''' '''
super().__init__() super().__init__()
self.router = router self.router = router
def listen(self): def listen(self):
'''
Register a new listener endpoint
'''
raise NotImplementedError raise NotImplementedError
def run(self): def run(self):
'''
Begin listening for events. Typically a blocking loop that passes events to
attached Router.
'''
raise NotImplementedError raise NotImplementedError

View File

@ -1,11 +1,3 @@
'''
Implements a file system watcher.
See also:
- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read
'''
#import fnmatch
import os import os
import time import time
import select import select
@ -16,26 +8,17 @@ from collections import defaultdict
from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks
from execlog import util from execlog import util
from execlog.event import FileEvent
from execlog.listener import Listener from execlog.listener import Listener
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# hardcoded file names to ignore class PathListener(Listener[FileEvent]):
# - "4913" is a temp file created by Vim before editing
IGNORE_PATTERNS = ['4913', '.sync*.db*']
class PathListener(Listener):
def __init__(self, router): def __init__(self, router):
''' '''
Parameters: Parameters:
workers: number of workers to assign the thread pool when the event loop is router: associated Router instance that events should be passed to
started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work
rather than intense CPU operations). Given the intended context for
this class, this assumption aligns appropriately.
Note: Note:
Due to the nature of INotify, you cannot watch the same path with two Due to the nature of INotify, you cannot watch the same path with two
@ -60,7 +43,7 @@ class PathListener(Listener):
self.inotify = INotify() self.inotify = INotify()
self.read_fd, write_fd = os.pipe() self.read_fd, write_fd = os.pipe()
self.write = os.fdopen(write_fd, "wb") self.write = os.fdopen(write_fd, 'wb')
def _add_watch( def _add_watch(
self, self,
@ -128,14 +111,13 @@ class PathListener(Listener):
flags=None, flags=None,
): ):
''' '''
Listen to file events occurring under a provided path, optionally only events
matching provided iNotify flags.
Parameters: Parameters:
path: Path (directory) to watch with `inotify` path: Path (directory) to watch with `inotify`
flags: inotify_simple flags matching FS event types allowed to trigger the flags: inotify_simple flags matching FS event types allowed to trigger the
callback callback
debounce: time in milliseconds to debounce file-based events at this path.
Applies to _file targets_; the same filename will have events
"debounced" at this interval (time delta calculated from last
un-rejected match).
''' '''
path = Path(path) path = Path(path)
@ -159,10 +141,6 @@ class PathListener(Listener):
`start()` is a blocking call. This will hog your main thread if not properly `start()` is a blocking call. This will hog your main thread if not properly
threaded. If handling this manually in your outer context, you will also need threaded. If handling this manually in your outer context, you will also need
to make sure to call `.stop()` to make sure to call `.stop()`
Parameters:
loop: asyncio loop to pass to `_event_loop`; used to schedule async callbacks
when present
''' '''
self.started = True self.started = True
logger.info(f'Starting listener for {len(self.watchmap)} paths') logger.info(f'Starting listener for {len(self.watchmap)} paths')
@ -332,7 +310,7 @@ class PathListener(Listener):
for event in events: for event in events:
# hard coded ignores # hard coded ignores
if util.path.glob_match(event.name, IGNORE_PATTERNS): continue if util.path.glob_match(event.name, util.path.IGNORE_PATTERNS): continue
mask_flags = iflags.from_mask(event.mask) mask_flags = iflags.from_mask(event.mask)

View File

@ -1,3 +1,6 @@
'''
Router
'''
import time import time
import asyncio import asyncio
import logging import logging
@ -8,21 +11,18 @@ from pathlib import Path
from typing import Callable from typing import Callable
from functools import partial from functools import partial
from colorama import Fore, Style from colorama import Fore, Style
from collections import namedtuple, defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from tqdm.auto import tqdm from tqdm.auto import tqdm
from execlog.event import Event
from execlog.listener import Listener
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Router[E: Event]:
Event = namedtuple(
'Event',
['endpoint', 'name', 'action'],
defaults=[None, None, None],
)
class Router:
''' '''
Route events to registered callbacks Route events to registered callbacks
@ -74,16 +74,22 @@ class Router:
earlier ones, or interact with intermediate disk states (raw file writes, DB earlier ones, or interact with intermediate disk states (raw file writes, DB
inserts, etc), before the earliest call has had a chance to clean up. inserts, etc), before the earliest call has had a chance to clean up.
''' '''
def __init__(self, loop=None, workers=None, listener_cls=None): listener_cls = Listener[E]
def __init__(self, loop=None, workers=None):
''' '''
Parameters: Parameters:
loop: loop:
workers: workers: number of workers to assign the thread pool when the event loop is
listener_cls: started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work
rather than intense CPU operations). Given the intended context for
this class, this assumption aligns appropriately.
''' '''
self.loop = loop self.loop = loop
self.workers = workers self.workers = workers
self.listener_cls = listener_cls
self.routemap : dict[str, list[tuple]] = defaultdict(list) self.routemap : dict[str, list[tuple]] = defaultdict(list)
self.post_callbacks = [] self.post_callbacks = []
@ -141,7 +147,7 @@ class Router:
route_tuple = (callback, pattern, debounce, delay, listener_kwargs) route_tuple = (callback, pattern, debounce, delay, listener_kwargs)
self.routemap[endpoint].append(route_tuple) self.routemap[endpoint].append(route_tuple)
def submit(self, events, callbacks=None): def submit(self, events:E | list[E], callbacks=None):
''' '''
Handle a list of events. Each event is matched against the registered callbacks, Handle a list of events. Each event is matched against the registered callbacks,
and those callbacks are ran concurrently (be it via a thread pool or an asyncio and those callbacks are ran concurrently (be it via a thread pool or an asyncio
@ -235,7 +241,7 @@ class Router:
return future return future
def matching_routes(self, event, event_time=None): def matching_routes(self, event: E, event_time=None):
''' '''
Return eligible matching routes for the provided event. Return eligible matching routes for the provided event.
@ -288,7 +294,7 @@ class Router:
# set next debounce # set next debounce
self.next_allowed_time[index] = event_time + debounce self.next_allowed_time[index] = event_time + debounce
match_text = Style.BRIGHT + Fore.GREEN + 'matched' match_text = Style.BRIGHT + Fore.GREEN + 'matched' + Fore.RESET
callback_name = str(callback) callback_name = str(callback)
if hasattr(callback, '__name__'): if hasattr(callback, '__name__'):
@ -298,7 +304,7 @@ class Router:
f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]' f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]'
) )
else: else:
match_text = Style.BRIGHT + Fore.RED + 'rejected' match_text = Style.BRIGHT + Fore.RED + 'rejected' + Fore.RESET
logger.debug( logger.debug(
f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]' f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]'
) )
@ -373,6 +379,7 @@ class Router:
''' '''
if listener_cls is None: if listener_cls is None:
listener_cls = self.listener_cls listener_cls = self.listener_cls
if listener_cls is None: if listener_cls is None:
raise ValueError('No Listener class provided') raise ValueError('No Listener class provided')

View File

@ -1,34 +1,25 @@
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Callable
from execlog.router import Router from execlog.router import Router
from execlog.listeners.path import PathListener from execlog.event import FileEvent
from execlog.util.path import glob_match from execlog.util.path import glob_match
from execlog.listeners.path import PathListener
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PathRouter(Router): class PathRouter(Router[FileEvent]):
def __init__(self, loop=None, workers=None, listener_cls=PathListener): listener_cls = PathListener
'''
Parameters:
workers: number of workers to assign the thread pool when the event loop is
started. Defaults to `None`, which, when passed to
ThreadPoolExecutor, will by default use 5x the number of available
processors on the machine (which the docs claim is a reasonable
assumption given threads are more commonly leveraged for I/O work
rather than intense CPU operations). Given the intended context for
this class, this assumption aligns appropriately.
'''
super().__init__(loop=loop, workers=workers, listener_cls=listener_cls)
def register( def register(
self, self,
path, path : Path,
func, func : Callable,
glob='**/!(.*|*.tmp|*~)', # recursive, non-temp glob : str = '**/!(.*|*.tmp|*~)', # recursive, non-temp
debounce=200, debounce : int|float = 200,
delay=30, delay : int|float = 30,
**listener_kwargs, **listener_kwargs,
): ):
''' '''
@ -38,6 +29,8 @@ class PathRouter(Router):
glob: Relative glob pattern to match files in provided path. The FS event's glob: Relative glob pattern to match files in provided path. The FS event's
filename must match this pattern for the callback to queued. (Default: filename must match this pattern for the callback to queued. (Default:
"*"; matching all files in path). "*"; matching all files in path).
debounce:
delay:
listener_kwargs: Additional params for associated listener "listen" routes. listener_kwargs: Additional params for associated listener "listen" routes.
See `PathListener.listen`. See `PathListener.listen`.
''' '''

View File

@ -1,3 +1,17 @@
'''
Server
Central management object for both file serving systems (static server, live reloading)
and job execution (routing and listening). Routers and Listeners can be started and
managed independently, but a single Server instance can house, start, and shutdown
listeners in one place.
TODO: as it stands, the Server requires address and port details, effectively needing one
of the HTTP items (static file serving or livereloading) to be initialized appropriately.
But there is a clear use case for just managing disparate Routers and their associated
Listeners. Should perhaps separate this "grouped listener" into another object, or just
make the Server definition more flexible.
'''
import re import re
import asyncio import asyncio
import logging import logging
@ -5,9 +19,9 @@ import threading
from functools import partial from functools import partial
import uvicorn import uvicorn
from inotify_simple import flags
from fastapi import FastAPI, WebSocket from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from inotify_simple import flags
from execlog.handler import Handler as LREndpoint from execlog.handler import Handler as LREndpoint
@ -29,6 +43,13 @@ class Server:
): ):
''' '''
Parameters: Parameters:
host: host server address (either 0.0.0.0, 127.0.0.1, localhost)
port: port at which to start the server
root: base path for static files _and_ where router bases are attached (i.e.,
when files at this path change, a reload event will be
propagated to a corresponding client page)
static: whether or not to start a static file server
livereload: whether or not to start a livereload server
managed_listeners: auxiliary listeners to "attach" to the server process, and to managed_listeners: auxiliary listeners to "attach" to the server process, and to
propagate the shutdown signal to when the server receives an propagate the shutdown signal to when the server receives an
interrupt. interrupt.
@ -66,8 +87,8 @@ class Server:
Note that, when present, the livereload endpoint is registered first, as the order Note that, when present, the livereload endpoint is registered first, as the order
in which routes are defined matters for FastAPI apps. This allows `/livereload` to in which routes are defined matters for FastAPI apps. This allows `/livereload` to
behave appropriately, despite the root re-mount that takes place if serving static behave appropriately, even when remounting the root if serving static files
files. (which, if done in the opposite order, would "eat up" the `/livereload` endpoint).
''' '''
# enable propagation and clear handlers for uvicorn internal loggers; # enable propagation and clear handlers for uvicorn internal loggers;
# allows logging messages to propagate to my root logger # allows logging messages to propagate to my root logger
@ -100,7 +121,7 @@ class Server:
''' '''
flags.MODIFY okay since we don't need to reload non-existent pages flags.MODIFY okay since we don't need to reload non-existent pages
''' '''
from localsys.reloader.router import PathRouter from execlog.reloader.router import PathRouter
if self.loop is None: if self.loop is None:
self.loop = asyncio.new_event_loop() self.loop = asyncio.new_event_loop()

View File

@ -1 +1,2 @@
from execlog.util import path from execlog.util import path
from execlog.util import generic

64
execlog/util/generic.py Normal file
View File

@ -0,0 +1,64 @@
import logging
import tqdm
import colorama
from colorama import Fore, Back, Style
class ColorFormatter(logging.Formatter):
_format = '%(levelname)-8s :: %(name)s %(message)s'
colorama.init(autoreset=True)
FORMATS = {
'x': Fore.YELLOW + _format,
'listener': Fore.GREEN + _format,
'handler': Fore.CYAN + _format,
'server': Style.DIM + Fore.CYAN + _format,
'router': Fore.MAGENTA + _format,
'site': Fore.BLUE + _format,
'utils': Style.DIM + Fore.WHITE + _format,
}
FORMATS = { k:logging.Formatter(v) for k,v in FORMATS.items() }
DEFAULT_LOGGER = logging.Formatter(_format)
def format(self, record):
# color by high-level submodule
name_parts = record.name.split('.')
package = ''.join(name_parts[:1])
submodule = ''.join(name_parts[1:2])
subsubmodule = '.'.join(name_parts[1:3])
formatter = self.DEFAULT_LOGGER
if subsubmodule in self.FORMATS:
formatter = self.FORMATS[subsubmodule]
elif submodule in self.FORMATS:
formatter = self.FORMATS[submodule]
name = record.name
if package == 'localsys':
name = f'localsys.{subsubmodule}'
limit = 26
name = name[:limit]
name = f'[{name}]{"-"*(limit-len(name))}'
record.name = name
return formatter.format(record)
class TqdmLoggingHandler(logging.StreamHandler):
def __init__(self, level=logging.NOTSET):
super().__init__(level)
formatter = ColorFormatter()
self.setFormatter(formatter)
def emit(self, record):
try:
msg = self.format(record)
#tqdm.tqdm.write(msg)
tqdm.tqdm.write(msg, end=self.terminator)
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)

View File

@ -6,6 +6,10 @@ from pathlib import Path
from wcmatch import glob as wc_glob from wcmatch import glob as wc_glob
# hardcoded file names to ignore
# - "4913" is a temp file created by Vim before editing
IGNORE_PATTERNS = ['4913', '.sync*.db*']
camel2snake_regex = re.compile(r'(?<!^)(?=[A-Z])') camel2snake_regex = re.compile(r'(?<!^)(?=[A-Z])')
def iter_nested_paths(path: Path, ext: str = None, no_dir=False, relative=False): def iter_nested_paths(path: Path, ext: str = None, no_dir=False, relative=False):

109
notebooks/listener.ipynb Normal file
View File

@ -0,0 +1,109 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "db82df24-b51b-4315-b104-bd6337f44acc",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/smgr/.pyenv/versions/execlog/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
" from .autonotebook import tqdm as notebook_tqdm\n"
]
}
],
"source": [
"from pathlib import Path\n",
"\n",
"from router_env import chain_router, events"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "07edc761-cdd6-4df3-a4d0-f1e256431621",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"defaultdict(<function PathListener.__init__.<locals>.<lambda> at 0x7fee0e5f9800>, {1: defaultdict(<class 'int'>, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.listeners.path:Starting listener for 1 paths\n",
"INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [<flags.MODIFY: 2>, <flags.MOVED_FROM: 64>, <flags.MOVED_TO: 128>, <flags.CREATE: 256>, <flags.DELETE: 512>, <flags.DELETE_SELF: 1024>]\n"
]
}
],
"source": [
"listener = chain_router.get_listener()\n",
"listener.start()\n",
"\n",
"print(listener.watchmap)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "e049fd73-227e-4574-bcad-3dbeff99804f",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.CREATE: 256>]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1-1 ::')]\n",
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.MODIFY: 2>]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2-1 ::')]\n",
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.DELETE: 512>]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2-2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3-1 ::')]\n"
]
}
],
"source": [
"file_a = Path('endpoint_proxy/fileA')\n",
"file_a.write_text('test text')\n",
"file_a.unlink()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "891a8bfd-465a-4c5d-a5d8-ab71f61cc624",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "execlog",
"language": "python",
"name": "execlog"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -2,21 +2,12 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": null,
"id": "cd0a1da5-da7a-4181-bea3-07d02991398f", "id": "718618b7-132f-44e0-8cad-6a912a623c82",
"metadata": { "metadata": {
"tags": [] "tags": []
}, },
"outputs": [ "outputs": [],
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/smgr/.pyenv/versions/execlog/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
" from .autonotebook import tqdm as notebook_tqdm\n"
]
}
],
"source": [ "source": [
"import logging\n", "import logging\n",
"from pathlib import Path\n", "from pathlib import Path\n",
@ -29,6 +20,17 @@
"logging.basicConfig(level=logging.DEBUG)" "logging.basicConfig(level=logging.DEBUG)"
] ]
}, },
{
"cell_type": "markdown",
"id": "f3fd536f-75d6-408d-88b2-1e4a1b62a51e",
"metadata": {},
"source": [
"# Router setup\n",
"Create individual \"frame\" routers, and attach them in a chain.\n",
"\n",
"A matching event will first be processed by matching callbacks in `router1` in parallel, blocking until all are completed, and then pass on to the next router (`router2`) to repeat the same process. This trajectory can occur in parallel for several events."
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 2, "execution_count": 2,
@ -43,6 +45,22 @@
"chain_router = ChainRouter([router1, router2, router3])" "chain_router = ChainRouter([router1, router2, router3])"
] ]
}, },
{
"cell_type": "markdown",
"id": "5a4a1e7a-ee8b-4eec-97dd-ed9abac73989",
"metadata": {},
"source": [
"Register callbacks to each of the routers. The `Router` objects are of type `PathRouter`, so `.register` takes a path endpoint and a function that accepts `Event`s.\n",
"\n",
"Events are created with the registered endpoint path, and a `name` parameter with the filename at that path target. Here one callback is attached to Router 1, two to Router 2, and three to Router 3. A given matching event should have a trajectory that looks like:\n",
"\n",
"```\n",
"Event -> Router-1 -> C1-1 -- blocking --> Router-2 --> C2-1 -- blocking --> Router-3 --> C3-1 -->\n",
" \\-> C2-2 -/ \\-> C3-2 -/\n",
" \\-> C3-3 -/\n",
"```"
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 3, "execution_count": 3,
@ -63,6 +81,14 @@
"]" "]"
] ]
}, },
{
"cell_type": "markdown",
"id": "b3f98138-e381-4a98-af33-0e5310f305ce",
"metadata": {},
"source": [
"Submit the event list to an individual router. Each event will be handled in its own thread, until the thread limit is reached, at which point the events remain in a queue until they can be processed."
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 4, "execution_count": 4,
@ -83,9 +109,9 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"[<Future at 0x78be3b5c2cf0 state=running>,\n", "[<Future at 0x7802dbfc62d0 state=running>,\n",
" <Future at 0x78be3b5c2ab0 state=running>,\n", " <Future at 0x7802dbfc6900 state=running>,\n",
" <Future at 0x78be3b5c34a0 state=running>]" " <Future at 0x7802e03660c0 state=running>]"
] ]
}, },
"execution_count": 4, "execution_count": 4,
@ -99,9 +125,6 @@
"R1 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", "R1 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", "R1 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", "R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n",
"R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n",
"R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.MODIFY: 2>])\n",
"R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n" "R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n"
] ]
} }
@ -111,6 +134,14 @@
"router1.submit(events)" "router1.submit(events)"
] ]
}, },
{
"cell_type": "markdown",
"id": "e4de6182-aae6-43e0-9d14-04f1e291fe41",
"metadata": {},
"source": [
"Submit the event list to the chain router. Each event will be processed independently and in parallel, so long as there are threads available. Each event will make its way through the router chain, blocking until all matching callbacks for a given router are completed."
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 5, "execution_count": 5,
@ -123,7 +154,6 @@
"name": "stderr", "name": "stderr",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n", "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n" "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
] ]
@ -131,9 +161,9 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"[<Future at 0x78be40f24e60 state=running>,\n", "[<Future at 0x7802dbfec500 state=running>,\n",
" <Future at 0x78be3b5c3740 state=running>,\n", " <Future at 0x7802dbfecb90 state=running>,\n",
" <Future at 0x78be3b5c1430 state=running>]" " <Future at 0x7802dbfec800 state=running>]"
] ]
}, },
"execution_count": 5, "execution_count": 5,
@ -144,10 +174,8 @@
"name": "stderr", "name": "stderr",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n", "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n", "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n",
"INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n", "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n" "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
] ]
@ -158,59 +186,10 @@
"text": [ "text": [
"R2 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", "R2 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n",
"R2 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", "R2 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n",
"R2 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n",
"R3 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", "R3 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n",
"R3 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", "R3 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n",
"R2 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", "R3 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n",
"R3 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n",
"R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.MODIFY: 2>])\n",
"R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.MODIFY: 2>])\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n", "R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n",
"R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n" "R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])\n"
] ]
@ -232,15 +211,21 @@
"name": "stderr", "name": "stderr",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"INFO:execlog.listeners.path:Starting listener for 1 paths\n", "INFO:execlog.listeners.path:Starting listener for 1 paths\n"
"INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [<flags.MODIFY: 2>, <flags.MOVED_FROM: 64>, <flags.MOVED_TO: 128>, <flags.CREATE: 256>, <flags.DELETE: 512>, <flags.DELETE_SELF: 1024>]\n"
] ]
}, },
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"defaultdict(<function PathListener.__init__.<locals>.<lambda> at 0x78be3b5db100>, {1: defaultdict(<class 'int'>, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n" "defaultdict(<function PathListener.__init__.<locals>.<lambda> at 0x7802dbfdfba0>, {1: defaultdict(<class 'int'>, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [<flags.MODIFY: 2>, <flags.MOVED_FROM: 64>, <flags.MOVED_TO: 128>, <flags.CREATE: 256>, <flags.DELETE: 512>, <flags.DELETE_SELF: 1024>]\n"
] ]
} }
], ],
@ -253,7 +238,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 9, "execution_count": 7,
"id": "00bb2889-f266-4fb1-9a89-7d7539aba9cf", "id": "00bb2889-f266-4fb1-9a89-7d7539aba9cf",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@ -262,7 +247,10 @@
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.CREATE: 256>]\n", "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.CREATE: 256>]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]\n",
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.MODIFY: 2>]\n", "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.MODIFY: 2>]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]\n",
"INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]\n",
"DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.DELETE: 512>]\n" "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.DELETE: 512>]\n"
] ]
} }

49
notebooks/router_env.py Normal file
View File

@ -0,0 +1,49 @@
import logging
from pathlib import Path
from functools import partial
from execlog import util
from execlog import ChainRouter, Event
from execlog.routers import PathRouter
from execlog.listeners import PathListener
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(util.generic.TqdmLoggingHandler())
# router setup
router1 = PathRouter()
router2 = PathRouter()
router3 = PathRouter()
chain_router = ChainRouter([router1, router2, router3])
# router-1
router1.register('endpoint_proxy', partial(print, 'R1-1 ::'))
# router-2
router2.register('endpoint_proxy', partial(print, 'R2-1 ::'))
router2.register('endpoint_proxy', partial(print, 'R2-2 ::'))
# router-3
router3.register('endpoint_proxy', partial(print, 'R3-1 ::'))
router3.register('endpoint_proxy', partial(print, 'R3-2 ::'))
router3.register('endpoint_proxy', partial(print, 'R3-3 ::'))
events = [
Event(endpoint='endpoint_proxy', name='file1'),
Event(endpoint='endpoint_proxy', name='file2'),
Event(endpoint='endpoint_proxy', name='file3'),
]
if __name__ == '__main__':
futures = chain_router.submit(events)
chain_router.wait_on_futures(futures)
#listener = chain_router.get_listener()
#listener.start()
#file_a = Path('endpoint_proxy/fileA')
#file_a.write_text('test text')
#file_a.unlink()

0
tests/test_handler.py Normal file
View File

0
tests/test_listener.py Normal file
View File

0
tests/test_router.py Normal file
View File

0
tests/test_server.py Normal file
View File