From 6c335c2a9ac4f52191317d1ffa4eb0b1fd543686 Mon Sep 17 00:00:00 2001 From: "Sam G." Date: Sat, 27 Apr 2024 16:33:08 -0700 Subject: [PATCH] add Event typing, clean up docstrings, add initial tests --- README.md | 7 +- execlog.egg-info/PKG-INFO | 7 +- execlog.egg-info/SOURCES.txt | 8 +- execlog/__init__.py | 1 + execlog/event.py | 14 ++++ execlog/handler.py | 9 +++ execlog/listener.py | 24 +++--- execlog/listeners/path.py | 40 +++------- execlog/router.py | 41 ++++++---- execlog/routers/path.py | 33 ++++---- execlog/server.py | 33 ++++++-- execlog/util/__init__.py | 1 + execlog/util/generic.py | 64 +++++++++++++++ execlog/util/path.py | 4 + notebooks/listener.ipynb | 109 ++++++++++++++++++++++++++ notebooks/router.ipynb | 146 ++++++++++++++++------------------- notebooks/router_env.py | 49 ++++++++++++ tests/test_handler.py | 0 tests/test_listener.py | 0 tests/test_router.py | 0 tests/test_server.py | 0 21 files changed, 424 insertions(+), 166 deletions(-) create mode 100644 execlog/event.py create mode 100644 execlog/util/generic.py create mode 100644 notebooks/listener.ipynb create mode 100644 notebooks/router_env.py create mode 100644 tests/test_handler.py create mode 100644 tests/test_listener.py create mode 100644 tests/test_router.py create mode 100644 tests/test_server.py diff --git a/README.md b/README.md index 73174c3..96ccac4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,7 @@ # Overview -`execlog` is a package +`execlog` is a lightweight multi-threaded job framework + +- **Handler**: live-reload handshake manager for connecting pages +- **Listener**: +- **Router** +- **Server**: diff --git a/execlog.egg-info/PKG-INFO b/execlog.egg-info/PKG-INFO index fba8d5f..4678c71 100644 --- a/execlog.egg-info/PKG-INFO +++ b/execlog.egg-info/PKG-INFO @@ -11,4 +11,9 @@ Description-Content-Type: text/markdown Requires-Dist: tqdm # Overview -`execlog` is a package +`execlog` is a lightweight multi-threaded job framework + +- **Handler**: live-reload handshake manager for connecting pages +- **Listener**: +- **Router** +- **Server**: diff --git a/execlog.egg-info/SOURCES.txt b/execlog.egg-info/SOURCES.txt index 82933f5..193cd72 100644 --- a/execlog.egg-info/SOURCES.txt +++ b/execlog.egg-info/SOURCES.txt @@ -2,6 +2,7 @@ MANIFEST.in README.md pyproject.toml execlog/__init__.py +execlog/event.py execlog/handler.py execlog/listener.py execlog/router.py @@ -16,4 +17,9 @@ execlog/listeners/path.py execlog/routers/__init__.py execlog/routers/path.py execlog/util/__init__.py -execlog/util/path.py \ No newline at end of file +execlog/util/generic.py +execlog/util/path.py +tests/test_handler.py +tests/test_listener.py +tests/test_router.py +tests/test_server.py \ No newline at end of file diff --git a/execlog/__init__.py b/execlog/__init__.py index 442d884..c3723ac 100644 --- a/execlog/__init__.py +++ b/execlog/__init__.py @@ -2,6 +2,7 @@ from execlog.handler import Handler from execlog.listener import Listener from execlog.router import Router, ChainRouter, Event from execlog.server import Server +from execlog.event import Event, FileEvent from execlog import listeners from execlog import routers diff --git a/execlog/event.py b/execlog/event.py new file mode 100644 index 0000000..2bdad80 --- /dev/null +++ b/execlog/event.py @@ -0,0 +1,14 @@ +from collections import namedtuple + + +Event = namedtuple( + 'Event', + ['endpoint', 'name', 'action'], + defaults=[None, None, None], +) + +FileEvent = namedtuple( + 'FileEvent', + ['endpoint', 'name', 'action'], + defaults=[None, None, None], +) diff --git a/execlog/handler.py b/execlog/handler.py index f958c76..6782042 100644 --- a/execlog/handler.py +++ b/execlog/handler.py @@ -1,3 +1,12 @@ +''' +Handler + +Websocket endpoint subclass intended to route websocket connections in a Server context. + +Note: the current Handler class is very specific, tailored entirely to handling a +supported live-reload handshake. This should likely be made more general, but until +separate handshakes or endpoints are needed, it's fine as is. +''' import re import logging from pathlib import Path diff --git a/execlog/listener.py b/execlog/listener.py index 53ca1f3..ae8e6bd 100644 --- a/execlog/listener.py +++ b/execlog/listener.py @@ -2,29 +2,33 @@ Implements a file system watcher. See also: - - https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read + +- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read ''' import threading +from execlog.event import Event -class Listener(threading.Thread): - def __init__(self, router): + +class Listener[E: Event](threading.Thread): + def __init__(self, router: 'Router[E]'): ''' Parameters: - workers: number of workers to assign the thread pool when the event loop is - started. Defaults to `None`, which, when passed to - ThreadPoolExecutor, will by default use 5x the number of available - processors on the machine (which the docs claim is a reasonable - assumption given threads are more commonly leveraged for I/O work - rather than intense CPU operations). Given the intended context for - this class, this assumption aligns appropriately. + router: associated Router instance that events should be passed to ''' super().__init__() self.router = router def listen(self): + ''' + Register a new listener endpoint + ''' raise NotImplementedError def run(self): + ''' + Begin listening for events. Typically a blocking loop that passes events to + attached Router. + ''' raise NotImplementedError diff --git a/execlog/listeners/path.py b/execlog/listeners/path.py index 19a9a52..f328074 100644 --- a/execlog/listeners/path.py +++ b/execlog/listeners/path.py @@ -1,11 +1,3 @@ -''' -Implements a file system watcher. - -See also: - -- https://inotify-simple.readthedocs.io/en/latest/#gracefully-exit-a-blocking-read -''' -#import fnmatch import os import time import select @@ -15,27 +7,18 @@ from collections import defaultdict from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks -from execlog import util +from execlog import util +from execlog.event import FileEvent from execlog.listener import Listener logger = logging.getLogger(__name__) -# hardcoded file names to ignore -# - "4913" is a temp file created by Vim before editing -IGNORE_PATTERNS = ['4913', '.sync*.db*'] - -class PathListener(Listener): +class PathListener(Listener[FileEvent]): def __init__(self, router): ''' Parameters: - workers: number of workers to assign the thread pool when the event loop is - started. Defaults to `None`, which, when passed to - ThreadPoolExecutor, will by default use 5x the number of available - processors on the machine (which the docs claim is a reasonable - assumption given threads are more commonly leveraged for I/O work - rather than intense CPU operations). Given the intended context for - this class, this assumption aligns appropriately. + router: associated Router instance that events should be passed to Note: Due to the nature of INotify, you cannot watch the same path with two @@ -60,7 +43,7 @@ class PathListener(Listener): self.inotify = INotify() self.read_fd, write_fd = os.pipe() - self.write = os.fdopen(write_fd, "wb") + self.write = os.fdopen(write_fd, 'wb') def _add_watch( self, @@ -128,14 +111,13 @@ class PathListener(Listener): flags=None, ): ''' + Listen to file events occurring under a provided path, optionally only events + matching provided iNotify flags. + Parameters: path: Path (directory) to watch with `inotify` flags: inotify_simple flags matching FS event types allowed to trigger the callback - debounce: time in milliseconds to debounce file-based events at this path. - Applies to _file targets_; the same filename will have events - "debounced" at this interval (time delta calculated from last - un-rejected match). ''' path = Path(path) @@ -159,10 +141,6 @@ class PathListener(Listener): `start()` is a blocking call. This will hog your main thread if not properly threaded. If handling this manually in your outer context, you will also need to make sure to call `.stop()` - - Parameters: - loop: asyncio loop to pass to `_event_loop`; used to schedule async callbacks - when present ''' self.started = True logger.info(f'Starting listener for {len(self.watchmap)} paths') @@ -332,7 +310,7 @@ class PathListener(Listener): for event in events: # hard coded ignores - if util.path.glob_match(event.name, IGNORE_PATTERNS): continue + if util.path.glob_match(event.name, util.path.IGNORE_PATTERNS): continue mask_flags = iflags.from_mask(event.mask) diff --git a/execlog/router.py b/execlog/router.py index 068346d..1cb22ed 100644 --- a/execlog/router.py +++ b/execlog/router.py @@ -1,3 +1,6 @@ +''' +Router +''' import time import asyncio import logging @@ -8,21 +11,18 @@ from pathlib import Path from typing import Callable from functools import partial from colorama import Fore, Style -from collections import namedtuple, defaultdict +from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, wait, as_completed from tqdm.auto import tqdm +from execlog.event import Event +from execlog.listener import Listener + + logger = logging.getLogger(__name__) - -Event = namedtuple( - 'Event', - ['endpoint', 'name', 'action'], - defaults=[None, None, None], -) - -class Router: +class Router[E: Event]: ''' Route events to registered callbacks @@ -74,16 +74,22 @@ class Router: earlier ones, or interact with intermediate disk states (raw file writes, DB inserts, etc), before the earliest call has had a chance to clean up. ''' - def __init__(self, loop=None, workers=None, listener_cls=None): + listener_cls = Listener[E] + + def __init__(self, loop=None, workers=None): ''' Parameters: loop: - workers: - listener_cls: + workers: number of workers to assign the thread pool when the event loop is + started. Defaults to `None`, which, when passed to + ThreadPoolExecutor, will by default use 5x the number of available + processors on the machine (which the docs claim is a reasonable + assumption given threads are more commonly leveraged for I/O work + rather than intense CPU operations). Given the intended context for + this class, this assumption aligns appropriately. ''' self.loop = loop self.workers = workers - self.listener_cls = listener_cls self.routemap : dict[str, list[tuple]] = defaultdict(list) self.post_callbacks = [] @@ -141,7 +147,7 @@ class Router: route_tuple = (callback, pattern, debounce, delay, listener_kwargs) self.routemap[endpoint].append(route_tuple) - def submit(self, events, callbacks=None): + def submit(self, events:E | list[E], callbacks=None): ''' Handle a list of events. Each event is matched against the registered callbacks, and those callbacks are ran concurrently (be it via a thread pool or an asyncio @@ -235,7 +241,7 @@ class Router: return future - def matching_routes(self, event, event_time=None): + def matching_routes(self, event: E, event_time=None): ''' Return eligible matching routes for the provided event. @@ -288,7 +294,7 @@ class Router: # set next debounce self.next_allowed_time[index] = event_time + debounce - match_text = Style.BRIGHT + Fore.GREEN + 'matched' + match_text = Style.BRIGHT + Fore.GREEN + 'matched' + Fore.RESET callback_name = str(callback) if hasattr(callback, '__name__'): @@ -298,7 +304,7 @@ class Router: f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]' ) else: - match_text = Style.BRIGHT + Fore.RED + 'rejected' + match_text = Style.BRIGHT + Fore.RED + 'rejected' + Fore.RESET logger.debug( f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]' ) @@ -373,6 +379,7 @@ class Router: ''' if listener_cls is None: listener_cls = self.listener_cls + if listener_cls is None: raise ValueError('No Listener class provided') diff --git a/execlog/routers/path.py b/execlog/routers/path.py index 480e082..be7fc02 100644 --- a/execlog/routers/path.py +++ b/execlog/routers/path.py @@ -1,34 +1,25 @@ import logging from pathlib import Path +from typing import Callable from execlog.router import Router -from execlog.listeners.path import PathListener +from execlog.event import FileEvent from execlog.util.path import glob_match +from execlog.listeners.path import PathListener logger = logging.getLogger(__name__) -class PathRouter(Router): - def __init__(self, loop=None, workers=None, listener_cls=PathListener): - ''' - Parameters: - workers: number of workers to assign the thread pool when the event loop is - started. Defaults to `None`, which, when passed to - ThreadPoolExecutor, will by default use 5x the number of available - processors on the machine (which the docs claim is a reasonable - assumption given threads are more commonly leveraged for I/O work - rather than intense CPU operations). Given the intended context for - this class, this assumption aligns appropriately. - ''' - super().__init__(loop=loop, workers=workers, listener_cls=listener_cls) - +class PathRouter(Router[FileEvent]): + listener_cls = PathListener + def register( self, - path, - func, - glob='**/!(.*|*.tmp|*~)', # recursive, non-temp - debounce=200, - delay=30, + path : Path, + func : Callable, + glob : str = '**/!(.*|*.tmp|*~)', # recursive, non-temp + debounce : int|float = 200, + delay : int|float = 30, **listener_kwargs, ): ''' @@ -38,6 +29,8 @@ class PathRouter(Router): glob: Relative glob pattern to match files in provided path. The FS event's filename must match this pattern for the callback to queued. (Default: "*"; matching all files in path). + debounce: + delay: listener_kwargs: Additional params for associated listener "listen" routes. See `PathListener.listen`. ''' diff --git a/execlog/server.py b/execlog/server.py index 8fbb027..19a9cd3 100644 --- a/execlog/server.py +++ b/execlog/server.py @@ -1,3 +1,17 @@ +''' +Server + +Central management object for both file serving systems (static server, live reloading) +and job execution (routing and listening). Routers and Listeners can be started and +managed independently, but a single Server instance can house, start, and shutdown +listeners in one place. + +TODO: as it stands, the Server requires address and port details, effectively needing one +of the HTTP items (static file serving or livereloading) to be initialized appropriately. +But there is a clear use case for just managing disparate Routers and their associated +Listeners. Should perhaps separate this "grouped listener" into another object, or just +make the Server definition more flexible. +''' import re import asyncio import logging @@ -5,9 +19,9 @@ import threading from functools import partial import uvicorn +from inotify_simple import flags from fastapi import FastAPI, WebSocket from fastapi.staticfiles import StaticFiles -from inotify_simple import flags from execlog.handler import Handler as LREndpoint @@ -29,9 +43,16 @@ class Server: ): ''' Parameters: + host: host server address (either 0.0.0.0, 127.0.0.1, localhost) + port: port at which to start the server + root: base path for static files _and_ where router bases are attached (i.e., + when files at this path change, a reload event will be + propagated to a corresponding client page) + static: whether or not to start a static file server + livereload: whether or not to start a livereload server managed_listeners: auxiliary listeners to "attach" to the server process, and to - propagate the shutdown signal to when the server receives an - interrupt. + propagate the shutdown signal to when the server receives an + interrupt. ''' self.host = host self.port = port @@ -66,8 +87,8 @@ class Server: Note that, when present, the livereload endpoint is registered first, as the order in which routes are defined matters for FastAPI apps. This allows `/livereload` to - behave appropriately, despite the root re-mount that takes place if serving static - files. + behave appropriately, even when remounting the root if serving static files + (which, if done in the opposite order, would "eat up" the `/livereload` endpoint). ''' # enable propagation and clear handlers for uvicorn internal loggers; # allows logging messages to propagate to my root logger @@ -100,7 +121,7 @@ class Server: ''' flags.MODIFY okay since we don't need to reload non-existent pages ''' - from localsys.reloader.router import PathRouter + from execlog.reloader.router import PathRouter if self.loop is None: self.loop = asyncio.new_event_loop() diff --git a/execlog/util/__init__.py b/execlog/util/__init__.py index fac37f2..5f3addb 100644 --- a/execlog/util/__init__.py +++ b/execlog/util/__init__.py @@ -1 +1,2 @@ from execlog.util import path +from execlog.util import generic diff --git a/execlog/util/generic.py b/execlog/util/generic.py new file mode 100644 index 0000000..b307c0a --- /dev/null +++ b/execlog/util/generic.py @@ -0,0 +1,64 @@ +import logging + +import tqdm +import colorama +from colorama import Fore, Back, Style + + +class ColorFormatter(logging.Formatter): + _format = '%(levelname)-8s :: %(name)s %(message)s' + colorama.init(autoreset=True) + + FORMATS = { + 'x': Fore.YELLOW + _format, + 'listener': Fore.GREEN + _format, + 'handler': Fore.CYAN + _format, + 'server': Style.DIM + Fore.CYAN + _format, + 'router': Fore.MAGENTA + _format, + 'site': Fore.BLUE + _format, + 'utils': Style.DIM + Fore.WHITE + _format, + } + FORMATS = { k:logging.Formatter(v) for k,v in FORMATS.items() } + DEFAULT_LOGGER = logging.Formatter(_format) + + def format(self, record): + # color by high-level submodule + name_parts = record.name.split('.') + package = ''.join(name_parts[:1]) + submodule = ''.join(name_parts[1:2]) + subsubmodule = '.'.join(name_parts[1:3]) + + formatter = self.DEFAULT_LOGGER + if subsubmodule in self.FORMATS: + formatter = self.FORMATS[subsubmodule] + elif submodule in self.FORMATS: + formatter = self.FORMATS[submodule] + + name = record.name + if package == 'localsys': + name = f'localsys.{subsubmodule}' + + limit = 26 + name = name[:limit] + name = f'[{name}]{"-"*(limit-len(name))}' + record.name = name + + return formatter.format(record) + +class TqdmLoggingHandler(logging.StreamHandler): + def __init__(self, level=logging.NOTSET): + super().__init__(level) + formatter = ColorFormatter() + self.setFormatter(formatter) + + def emit(self, record): + try: + msg = self.format(record) + #tqdm.tqdm.write(msg) + tqdm.tqdm.write(msg, end=self.terminator) + self.flush() + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + diff --git a/execlog/util/path.py b/execlog/util/path.py index 574a713..2403c8d 100644 --- a/execlog/util/path.py +++ b/execlog/util/path.py @@ -6,6 +6,10 @@ from pathlib import Path from wcmatch import glob as wc_glob +# hardcoded file names to ignore +# - "4913" is a temp file created by Vim before editing +IGNORE_PATTERNS = ['4913', '.sync*.db*'] + camel2snake_regex = re.compile(r'(?. at 0x7fee0e5f9800>, {1: defaultdict(, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.listeners.path:Starting listener for 1 paths\n", + "INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [, , , , , ]\n" + ] + } + ], + "source": [ + "listener = chain_router.get_listener()\n", + "listener.start()\n", + "\n", + "print(listener.watchmap)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "e049fd73-227e-4574-bcad-3dbeff99804f", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1-1 ::')]\n", + "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2-1 ::')]\n", + "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2-2 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3-1 ::')]\n" + ] + } + ], + "source": [ + "file_a = Path('endpoint_proxy/fileA')\n", + "file_a.write_text('test text')\n", + "file_a.unlink()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "891a8bfd-465a-4c5d-a5d8-ab71f61cc624", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "execlog", + "language": "python", + "name": "execlog" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/router.ipynb b/notebooks/router.ipynb index ce4b9d3..1d85c73 100644 --- a/notebooks/router.ipynb +++ b/notebooks/router.ipynb @@ -2,21 +2,12 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, - "id": "cd0a1da5-da7a-4181-bea3-07d02991398f", + "execution_count": null, + "id": "718618b7-132f-44e0-8cad-6a912a623c82", "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/home/smgr/.pyenv/versions/execlog/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", - " from .autonotebook import tqdm as notebook_tqdm\n" - ] - } - ], + "outputs": [], "source": [ "import logging\n", "from pathlib import Path\n", @@ -29,6 +20,17 @@ "logging.basicConfig(level=logging.DEBUG)" ] }, + { + "cell_type": "markdown", + "id": "f3fd536f-75d6-408d-88b2-1e4a1b62a51e", + "metadata": {}, + "source": [ + "# Router setup\n", + "Create individual \"frame\" routers, and attach them in a chain.\n", + "\n", + "A matching event will first be processed by matching callbacks in `router1` in parallel, blocking until all are completed, and then pass on to the next router (`router2`) to repeat the same process. This trajectory can occur in parallel for several events." + ] + }, { "cell_type": "code", "execution_count": 2, @@ -43,6 +45,22 @@ "chain_router = ChainRouter([router1, router2, router3])" ] }, + { + "cell_type": "markdown", + "id": "5a4a1e7a-ee8b-4eec-97dd-ed9abac73989", + "metadata": {}, + "source": [ + "Register callbacks to each of the routers. The `Router` objects are of type `PathRouter`, so `.register` takes a path endpoint and a function that accepts `Event`s.\n", + "\n", + "Events are created with the registered endpoint path, and a `name` parameter with the filename at that path target. Here one callback is attached to Router 1, two to Router 2, and three to Router 3. A given matching event should have a trajectory that looks like:\n", + "\n", + "```\n", + "Event -> Router-1 -> C1-1 -- blocking --> Router-2 --> C2-1 -- blocking --> Router-3 --> C3-1 -->\n", + " \\-> C2-2 -/ \\-> C3-2 -/\n", + " \\-> C3-3 -/\n", + "```" + ] + }, { "cell_type": "code", "execution_count": 3, @@ -63,6 +81,14 @@ "]" ] }, + { + "cell_type": "markdown", + "id": "b3f98138-e381-4a98-af33-0e5310f305ce", + "metadata": {}, + "source": [ + "Submit the event list to an individual router. Each event will be handled in its own thread, until the thread limit is reached, at which point the events remain in a queue until they can be processed." + ] + }, { "cell_type": "code", "execution_count": 4, @@ -83,9 +109,9 @@ { "data": { "text/plain": [ - "[,\n", - " ,\n", - " ]" + "[,\n", + " ,\n", + " ]" ] }, "execution_count": 4, @@ -99,9 +125,6 @@ "R1 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", "R1 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", "R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", - "R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", - "R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", - "R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", "R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" ] } @@ -111,6 +134,14 @@ "router1.submit(events)" ] }, + { + "cell_type": "markdown", + "id": "e4de6182-aae6-43e0-9d14-04f1e291fe41", + "metadata": {}, + "source": [ + "Submit the event list to the chain router. Each event will be processed independently and in parallel, so long as there are threads available. Each event will make its way through the router chain, blocking until all matching callbacks for a given router are completed." + ] + }, { "cell_type": "code", "execution_count": 5, @@ -123,7 +154,6 @@ "name": "stderr", "output_type": "stream", "text": [ - "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", "INFO:execlog.router:Event [file1] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" ] @@ -131,9 +161,9 @@ { "data": { "text/plain": [ - "[,\n", - " ,\n", - " ]" + "[,\n", + " ,\n", + " ]" ] }, "execution_count": 5, @@ -144,10 +174,8 @@ "name": "stderr", "output_type": "stream", "text": [ - "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", "INFO:execlog.router:Event [file2] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n", - "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", "INFO:execlog.router:Event [file3] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" ] @@ -158,59 +186,10 @@ "text": [ "R2 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", "R2 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", + "R2 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", "R3 :: Event(endpoint='endpoint_proxy', name='file1', action=None)\n", "R3 :: Event(endpoint='endpoint_proxy', name='file2', action=None)\n", - "R2 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", - "R3 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", - "R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", - "R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", - "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ + "R3 :: Event(endpoint='endpoint_proxy', name='file3', action=None)\n", "R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n", "R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[])\n" ] @@ -232,15 +211,21 @@ "name": "stderr", "output_type": "stream", "text": [ - "INFO:execlog.listeners.path:Starting listener for 1 paths\n", - "INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [, , , , , ]\n" + "INFO:execlog.listeners.path:Starting listener for 1 paths\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ - "defaultdict(. at 0x78be3b5db100>, {1: defaultdict(, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n" + "defaultdict(. at 0x7802dbfdfba0>, {1: defaultdict(, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [, , , , , ]\n" ] } ], @@ -253,7 +238,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 7, "id": "00bb2889-f266-4fb1-9a89-7d7539aba9cf", "metadata": {}, "outputs": [ @@ -262,7 +247,10 @@ "output_type": "stream", "text": [ "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R1 ::')]\n", "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R2 ::')]\n", + "INFO:execlog.router:Event [fileA] \u001b[1m\u001b[32mmatched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(, 'R3 ::')]\n", "DEBUG:execlog.listeners.path:Watcher fired for [fileA]: []\n" ] } diff --git a/notebooks/router_env.py b/notebooks/router_env.py new file mode 100644 index 0000000..fded96b --- /dev/null +++ b/notebooks/router_env.py @@ -0,0 +1,49 @@ +import logging +from pathlib import Path +from functools import partial + +from execlog import util +from execlog import ChainRouter, Event +from execlog.routers import PathRouter +from execlog.listeners import PathListener + + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) +logger.addHandler(util.generic.TqdmLoggingHandler()) + +# router setup +router1 = PathRouter() +router2 = PathRouter() +router3 = PathRouter() + +chain_router = ChainRouter([router1, router2, router3]) + +# router-1 +router1.register('endpoint_proxy', partial(print, 'R1-1 ::')) + +# router-2 +router2.register('endpoint_proxy', partial(print, 'R2-1 ::')) +router2.register('endpoint_proxy', partial(print, 'R2-2 ::')) + +# router-3 +router3.register('endpoint_proxy', partial(print, 'R3-1 ::')) +router3.register('endpoint_proxy', partial(print, 'R3-2 ::')) +router3.register('endpoint_proxy', partial(print, 'R3-3 ::')) + +events = [ + Event(endpoint='endpoint_proxy', name='file1'), + Event(endpoint='endpoint_proxy', name='file2'), + Event(endpoint='endpoint_proxy', name='file3'), +] + +if __name__ == '__main__': + futures = chain_router.submit(events) + chain_router.wait_on_futures(futures) + + #listener = chain_router.get_listener() + #listener.start() + + #file_a = Path('endpoint_proxy/fileA') + #file_a.write_text('test text') + #file_a.unlink() diff --git a/tests/test_handler.py b/tests/test_handler.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_listener.py b/tests/test_listener.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_router.py b/tests/test_router.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..e69de29