Compare commits

...

3 Commits

29 changed files with 230 additions and 198 deletions

View File

@ -1,8 +1,10 @@
# Overview
`execlog` is a lightweight multi-threaded job framework written in Python. It implements a
`execlib` is a lightweight multi-threaded job framework written in Python. It implements a
simple event-based model over core Python utilities like `ThreadPoolExecutor` to
facilitate reactivity and manage concurrent responses.
![High-level execution flow diagram](docs/_static/execlib.png)
There are a few top-level classes exposed by the package:
- **Router**: Central event routing object. Routers facilitate route registration,
@ -17,3 +19,20 @@ There are a few top-level classes exposed by the package:
iNotify to dynamically respond to file events.
- **Server**: Long-running process manager for listeners and optional live-reloading via
HTTP. Interfaces with listener `start()` and `shutdown()` for graceful interruption.
# Install
```sh
pip install execlib
```
# Development
## Documentation
```sh
pip install execlib[docs]
```
## Testing
```sh
pip install execlib[tests]
```

BIN
docs/_static/execlib.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 331 KiB

View File

@ -6,7 +6,7 @@
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = 'execlog'
project = 'execlib'
copyright = '2024, Sam Griesemer'
author = 'Sam Griesemer'

View File

@ -1,4 +1,4 @@
# `execlog` package docs
# `execlib` package docs
{ref}`genindex`
{ref}`modindex`
{ref}`search`
@ -10,18 +10,18 @@
:nosignatures:
:recursive:
execlog.Handler
execlog.Listener
execlog.Router
execlog.Server
execlog.listeners
execlib.Handler
execlib.Listener
execlib.Router
execlib.Server
execlib.listeners
```
## Auto-reference contents
```{toctree}
:maxdepth: 3
_autoref/execlog.rst
_autoref/execlib.rst
```
```{toctree}
@ -32,5 +32,6 @@ reference/documentation/index
```
```{include} ../README.md
:relative-docs: docs/
:relative-images:
```

10
execlib/__init__.py Normal file
View File

@ -0,0 +1,10 @@
from execlib import util
from execlib import routers
from execlib import syncers
from execlib import listeners
from execlib.server import Server
from execlib.handler import Handler
from execlib.listener import Listener
from execlib.event import Event, FileEvent
from execlib.router import Router, ChainRouter, Event, RouterBuilder, route

View File

@ -5,7 +5,7 @@ See also:
'''
import threading
from execlog.event import Event
from execlib.event import Event
class Listener[E: Event](threading.Thread):

View File

@ -0,0 +1 @@
from execlib.listeners.path import PathListener

View File

@ -8,10 +8,10 @@ from collections import defaultdict
from colorama import Fore, Back, Style
from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks
from execlog import util
from execlog.util.generic import color_text
from execlog.event import FileEvent
from execlog.listener import Listener
from execlib import util
from execlib.util.generic import color_text
from execlib.event import FileEvent
from execlib.listener import Listener
logger = logging.getLogger(__name__)

View File

@ -18,9 +18,9 @@ from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from tqdm.auto import tqdm
from execlog.event import Event
from execlog.listener import Listener
from execlog.util.generic import color_text, get_func_name
from execlib.event import Event
from execlib.listener import Listener
from execlib.util.generic import color_text, get_func_name
logger = logging.getLogger(__name__)
@ -860,14 +860,15 @@ class Router[E: Event]:
# manually track and cancel pending futures b/c `.shutdown(cancel_futures=True)`
# is misleading, and will cause an outer `as_completed` loop to hang
for future in tqdm(
list(self._active_futures),
desc=color_text(
f'Cancelling {len(self._active_futures)} pending futures...',
Fore.BLACK, Back.RED),
colour='red',
):
future.cancel()
if self._active_futures:
for future in tqdm(
list(self._active_futures),
desc=color_text(
f'Cancelling {len(self._active_futures)} pending futures...',
Fore.BLACK, Back.RED),
colour='red',
):
future.cancel()
if self._thread_pool_2 is not None:
# cancel pending futures (i.e., those not started)
@ -989,6 +990,21 @@ class ChainRouter[E: Event](Router[E]):
return self.running_events.pop(event_idx, None)
def get_listener(self, listener_cls=None):
'''
Gets the listener for the entire router chain.
Builds a single listener from the registered endpoints of each router, allowing
a global iNotify instance to serve all routes. This provides a clean target for
outer server context managers.
Note that having multiple iNotify instances, even those watching the same
directories, in general shouldn't pose an issue. iNotify instances are
independent, and the kernel will send events to each listening instance when one
occurs. This is why we consolidate the listener for chained routers: we want to
respond to events sequentially according to router order, rather than have each
router expose a listener that can independently hear and respond to its own
events.
'''
if listener_cls is None:
for router in self.ordered_routers:
if router.listener_cls is not None:

View File

@ -0,0 +1 @@
from execlib.routers.path import PathRouter

View File

@ -2,10 +2,10 @@ import logging
from pathlib import Path
from typing import Callable
from execlog.router import Router
from execlog.event import FileEvent
from execlog.util.path import glob_match
from execlog.listeners.path import PathListener
from execlib.router import Router
from execlib.event import FileEvent
from execlib.util.path import glob_match
from execlib.listeners.path import PathListener
logger = logging.getLogger(__name__)

View File

@ -26,8 +26,8 @@ from inotify_simple import flags
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from execlog.routers.path import PathRouter
from execlog.handler import Handler as LREndpoint
from execlib.routers.path import PathRouter
from execlib.handler import Handler as LREndpoint
logger = logging.getLogger(__name__)
@ -36,59 +36,58 @@ class Server:
'''
Wraps up a development static file server and live reloader.
'''
def __init__(self):
self.server = None
# MT/MP server implementations can check this variable for graceful shutdowns
self.should_shutdown = False
self.started = False
# used to isolate server creation logic, when applicable
self._init_server()
def _init_server(self):
pass
def start(self):
self.started = True
def shutdown(self):
self.should_shutdown = True
logger.debug("Server shutdown request received")
class ProcessServer(Server):
'''
general subprocess start, shutdown, output catching logic
'''
pass
class HTTPServer(Server):
def __init__(
self,
host,
port,
root,
static : bool = False,
livereload : bool = False,
managed_listeners : list | None = None,
loop = None,
):
'''
Parameters:
host: host server address (either 0.0.0.0, 127.0.0.1, localhost)
port: port at which to start the server
root: base path for static files _and_ where router bases are attached (i.e.,
when files at this path change, a reload event will be
propagated to a corresponding client page)
static: whether or not to start a static file server
livereload: whether or not to start a livereload server
managed_listeners: auxiliary listeners to "attach" to the server process, and to
propagate the shutdown signal to when the server receives an
interrupt.
host: host server address (either 0.0.0.0, 127.0.0.1, localhost)
port: port at which to start the server
'''
self.host = host
self.port = port
self.root = root
self.static = static
self.livereload = livereload
self.host = host
self.port = port
self.loop = loop
if managed_listeners is None:
managed_listeners = []
self.managed_listeners = managed_listeners
self.userver = None
self.listener = None
self.userver = None
self.server = None
self.server_text = ''
self.server_args = {}
super().__init__()
self.started = False
self.loop = None
self._server_setup()
def _wrap_static(self):
self.server.mount("/", StaticFiles(directory=self.root), name="static")
def _wrap_livereload(self):
self.server.websocket_route('/livereload')(LREndpoint)
#self.server.add_api_websocket_route('/livereload', LREndpoint)
def _server_setup(self):
def _init_server(self):
'''
Set up the FastAPI server. Only a single server instance is used here, optionally
Set up the FastAPI server and Uvicorn hook.
Only a single server instance is used here, optionally
mounting the static route (if static serving enabled) and providing a websocket
endpoint (if livereload enabled).
@ -97,57 +96,34 @@ class Server:
behave appropriately, even when remounting the root if serving static files
(which, if done in the opposite order, would "eat up" the ``/livereload`` endpoint).
'''
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# enable propagation and clear handlers for uvicorn internal loggers;
# allows logging messages to propagate to my root logger
log_config = uvicorn.config.LOGGING_CONFIG
log_config['loggers']['uvicorn']['propagate'] = True
log_config['loggers']['uvicorn']['handlers'] = []
log_config['loggers']['uvicorn']['propagate'] = True
log_config['loggers']['uvicorn']['handlers'] = []
log_config['loggers']['uvicorn.access']['propagate'] = True
log_config['loggers']['uvicorn.access']['handlers'] = []
log_config['loggers']['uvicorn.error']['propagate'] = False
log_config['loggers']['uvicorn.error']['handlers'] = []
log_config['loggers']['uvicorn.access']['handlers'] = []
log_config['loggers']['uvicorn.error']['propagate'] = False
log_config['loggers']['uvicorn.error']['handlers'] = []
self.server_args['log_config'] = log_config
self.server_args['host'] = self.host
self.server_args['port'] = self.port
server_args = {}
server_args['log_config'] = log_config
server_args['host'] = self.host
server_args['port'] = self.port
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
self.shutdown()
if self.static or self.livereload:
self.server = FastAPI(lifespan=lifespan)
#self.server.on_event('shutdown')(self.shutdown)
self.server = FastAPI(lifespan=lifespan)
if self.livereload:
self._wrap_livereload()
self._listener_setup()
self.server_text += '+reload'
if self.static:
self._wrap_static()
self.server_text += '+static'
def _listener_setup(self):
'''
flags.MODIFY okay since we don't need to reload non-existent pages
'''
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
#self.listener = listener.WatchFS(loop=self.loop)
self.router = PathRouter(loop=self.loop)
self.router.register(
path=str(self.root),
func=LREndpoint.reload_clients,
delay=100,
flags=flags.MODIFY,
)
self.listener = self.router.get_listener()
self.managed_listeners.append(self.listener)
uconfig = uvicorn.Config(app=self.server, loop=self.loop, **self.server_args)
self.userver = uvicorn.Server(config=uconfig)
def start(self):
'''
@ -220,24 +196,12 @@ class Server:
and shut down their thread pools, gracefully close up the Uvicorn server and
allow the serve coroutine to complete, and finally close down the event loop.
'''
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
super().start()
for listener in self.managed_listeners:
#loop.run_in_executor(None, partial(self.listener.start, loop=loop))
if not listener.started:
listener.start()
logger.info(f'Starting HTTP server @ http://{self.host}:{self.port}')
self.started = False
if self.server:
logger.info(f'Server{self.server_text} @ http://{self.host}:{self.port}')
uconfig = uvicorn.Config(app=self.server, loop=self.loop, **self.server_args)
self.userver = uvicorn.Server(config=uconfig)
self.loop.run_until_complete(self.userver.serve())
self.loop.close()
self.loop.run_until_complete(self.userver.serve())
self.loop.close()
def shutdown(self):
'''
@ -279,15 +243,6 @@ class Server:
task should be considered "completed," at which point the event loop can be closed
successfully.
'''
logger.info("Shutting down server...")
# stop attached auxiliary listeners, both internal & external
if self.managed_listeners:
logger.info(f"Stopping {len(self.managed_listeners)} listeners...")
for listener in self.managed_listeners:
listener.stop()
# stop FastAPI server if started
if self.userver is not None:
def set_should_exit():
@ -295,7 +250,39 @@ class Server:
self.loop.call_soon_threadsafe(set_should_exit)
class ListenerServer:
class StaticHTTPServer(Server):
def __init__(
self,
root,
*args,
**kwargs
):
'''
Parameters:
root: base path for static files _and_ where router bases are attached (i.e.,
when files at this path change, a reload event will be propagated to a
corresponding client page)
'''
self.root = root
super().__init__(*args, **kwargs)
def _init_server(self):
super()._init_server()
self.server.mount(
"/",
StaticFiles(directory=self.root),
name="static"
)
class LiveReloadHTTPServer(Server):
def _init_server(self):
super()._init_server()
self.server.websocket_route('/livereload')(LREndpoint)
class ListenerServer(Server):
'''
Server abstraction to handle disparate listeners.
'''
@ -303,24 +290,53 @@ class ListenerServer:
self,
managed_listeners : list | None = None,
):
'''
Parameters:
managed_listeners: auxiliary listeners to "attach" to the server process, and to
propagate the shutdown signal to when the server receives an
interrupt.
'''
super().__init__()
if managed_listeners is None:
managed_listeners = []
self.managed_listeners = managed_listeners
self.started = False
def _listener_setup(self):
'''
flags.MODIFY okay since we don't need to reload non-existent pages
'''
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
#self.listener = listener.WatchFS(loop=self.loop)
self.router = PathRouter(loop=self.loop)
self.router.register(
path=str(self.root),
func=LREndpoint.reload_clients,
delay=100,
flags=flags.MODIFY,
)
self.listener = self.router.get_listener()
self.managed_listeners.append(self.listener)
def start(self):
super().start()
for listener in self.managed_listeners:
#loop.run_in_executor(None, partial(self.listener.start, loop=loop))
if not listener.started:
listener.start()
self.started = True
for listener in self.managed_listeners:
listener.join()
def shutdown(self):
super().shutdown()
# stop attached auxiliary listeners, both internal & external
if self.managed_listeners:
logger.info(f"Stopping {len(self.managed_listeners)} listeners...")

View File

@ -0,0 +1 @@
from execlib.syncers.router import PathDiffer, PathRouterSyncer

View File

@ -10,10 +10,10 @@ from inotify_simple import flags as iflags
from co3.resources import DiskResource
from co3 import Differ, Syncer, Database
from execlog.event import Event
from execlog.router import CancelledFrameError
from execlog.routers import PathRouter
from execlog.util.generic import color_text
from execlib.event import Event
from execlib.router import CancelledFrameError
from execlib.routers import PathRouter
from execlib.util.generic import color_text
logger = logging.getLogger(__name__)

2
execlib/util/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from execlib.util import path
from execlib.util import generic

View File

@ -46,8 +46,8 @@ class ColorFormatter(logging.Formatter):
formatter = self.FORMATS[submodule]
name = record.name
if package == 'execlog':
name = f'execlog.{subsubmodule}'
if package == 'execlib':
name = f'execlib.{subsubmodule}'
limit = 26
name = name[:limit]

View File

@ -1,10 +0,0 @@
from execlog import util
from execlog import routers
from execlog import syncers
from execlog import listeners
from execlog.server import Server
from execlog.handler import Handler
from execlog.listener import Listener
from execlog.event import Event, FileEvent
from execlog.router import Router, ChainRouter, Event, RouterBuilder, route

View File

@ -1 +0,0 @@
from execlog.listeners.path import PathListener

View File

@ -1 +0,0 @@
from execlog.routers.path import PathRouter

View File

@ -1 +0,0 @@
from execlog.syncers.router import PathDiffer, PathRouterSyncer

View File

@ -1,2 +0,0 @@
from execlog.util import path
from execlog.util import generic

View File

@ -12,7 +12,7 @@ description = "Lightweight multi-threaded job framework"
readme = "README.md"
requires-python = ">=3.12"
dynamic = ["version"]
license = {file = "LICENSE"}
#license = {file = "LICENSE"}
authors = [
{ name="Sam Griesemer", email="samgriesemer+git@gmail.com" },
]
@ -43,14 +43,15 @@ docs = [
"furo",
"myst-parser",
]
jupyter = ["ipykernel"]
[project.urls]
Homepage = "https://doc.olog.io/execlog"
Documentation = "https://doc.olog.io/execlog"
Repository = "https://git.olog.io/olog/execlog"
Issues = "https://git.olog.io/olog/execlog/issues"
Homepage = "https://doc.olog.io/execlib"
Documentation = "https://doc.olog.io/execlib"
Repository = "https://git.olog.io/olog/execlib"
Issues = "https://git.olog.io/olog/execlib/issues"
[tool.setuptools.packages.find]
include = ["execlog*"] # pattern to match package names
include = ["execlib*"] # pattern to match package names

View File

@ -1,21 +0,0 @@
# -- package-specific --
fastapi
starlette
uvicorn
inotify_simple
tqdm
wcmatch
websockets
# -- logging --
colorama
# -- sphinx docs --
sphinx
sphinx-togglebutton
furo
myst-parser
sphinx-autodoc-typehints
# -- testing ---
pytest

View File

@ -3,10 +3,10 @@ import logging
from pathlib import Path
from functools import partial
from execlog import util
from execlog import ChainRouter, Event
from execlog.routers import PathRouter
from execlog.listeners import PathListener
from execlib import util
from execlib import ChainRouter, Event
from execlib.routers import PathRouter
from execlib.listeners import PathListener
logger = logging.getLogger()

View File

@ -3,10 +3,10 @@ from pathlib import Path
from functools import partial
from concurrent.futures import wait
from execlog import util
from execlog import ChainRouter, Event
from execlog.routers import PathRouter
from execlog.listeners import PathListener
from execlib import util
from execlib import ChainRouter, Event
from execlib.routers import PathRouter
from execlib.listeners import PathListener
logger = logging.getLogger()

View File

@ -4,8 +4,8 @@ import threading
import logging
from pathlib import Path
from execlog import Server
from execlog.routers import PathRouter
from execlib import Server
from execlib.routers import PathRouter
logger = logging.getLogger()