543 lines
23 KiB
Python
543 lines
23 KiB
Python
'''
|
|
Router
|
|
'''
|
|
import time
|
|
import asyncio
|
|
import logging
|
|
import inspect
|
|
import traceback
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
from functools import partial
|
|
from colorama import Fore, Style
|
|
from collections import defaultdict
|
|
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
|
|
|
|
from tqdm.auto import tqdm
|
|
|
|
from execlog.event import Event
|
|
from execlog.listener import Listener
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Router[E: Event]:
|
|
'''
|
|
Route events to registered callbacks
|
|
|
|
Generalized registration includes an endpoint (the origin of an event), a pattern (to
|
|
filter events at the endpoint), and a callback (to be executed if pattern is matched).
|
|
|
|
The Router _routes_ events to affiliated callbacks in a multi-threaded fashion. A
|
|
thread pool handles these jobs as events are submitted, typically by a composing
|
|
Listener. The Listener "hears" an event, and passes it on through to a Router to
|
|
further filter and delegate any matching follow-up jobs.
|
|
|
|
This base Router implements most of the registry and filter model. When events are
|
|
submitted for propagation, they are checked for matching routes. Events specify an
|
|
origin endpoint, which is used as the filter for attached routes. The event is then
|
|
subjected to the `filter` method, which checks if the event matches the registered
|
|
`pattern` under the originated `endpoint`. If so, the callback is scheduled for
|
|
execution, and the matching event is passed as its sole argument.
|
|
|
|
Subclasses are expected to implement (at least) the `filter` method. This function is
|
|
responsible for wrapping up the task-specific logic needed to determine if an event,
|
|
originating from a known endpoint, matches the callback-specific pattern. This method
|
|
needn't handle any other filter logic, like checking if the event originates from the
|
|
provided endpoint, as this is already handled by the outer look in `matching_routes`.
|
|
|
|
`get_listener` is a convenience method that instantiates and populates an affiliated
|
|
Listener over the register paths found in the Router. Listeners require a Router upon
|
|
instantiation so events can be propagated to available targets when they occur.
|
|
`get_listener()` is the recommended way to attain a Listener.
|
|
|
|
Note: on debouncing events
|
|
Previously, debouncing was handled by listeners. This logic has been generalized
|
|
and moved to this class, as it's general enough to be desired across various
|
|
Listener types. We also need unique, identifying info only available with a
|
|
`(endpoint, callback, pattern)` triple in order to debounce events in accordance
|
|
with their intended target.
|
|
|
|
Note: tracking events and serializing callback frames
|
|
Although not part of the original implementation, we now track which events have a
|
|
callback chain actively being executed, and prevent the same chain from being
|
|
started concurrently. If the callback chain is actively running for an event, and
|
|
that same event is submitted before this chain finishes, the request is simply
|
|
enqueued. The `clear_event` method is attached as a "done callback" to each job
|
|
future, and will re-submit the event once the active chain finishes.
|
|
|
|
While this could be interpreted as a harsh design choice, it helps prevent many
|
|
many thread conflicts (race conditions, writing to the same resources, etc) when
|
|
the same function is executed concurrently, many times over. Without waiting
|
|
completely for an event to be fully handled, later jobs may complete before
|
|
earlier ones, or interact with intermediate disk states (raw file writes, DB
|
|
inserts, etc), before the earliest call has had a chance to clean up.
|
|
'''
|
|
listener_cls = Listener[E]
|
|
|
|
def __init__(self, loop=None, workers=None):
|
|
'''
|
|
Parameters:
|
|
loop:
|
|
workers: number of workers to assign the thread pool when the event loop is
|
|
started. Defaults to `None`, which, when passed to
|
|
ThreadPoolExecutor, will by default use 5x the number of available
|
|
processors on the machine (which the docs claim is a reasonable
|
|
assumption given threads are more commonly leveraged for I/O work
|
|
rather than intense CPU operations). Given the intended context for
|
|
this class, this assumption aligns appropriately.
|
|
'''
|
|
self.loop = loop
|
|
self.workers = workers
|
|
|
|
self.routemap : dict[str, list[tuple]] = defaultdict(list)
|
|
self.post_callbacks = []
|
|
|
|
# track running jobs by event
|
|
self.running_events = defaultdict(set)
|
|
|
|
# debounce tracker
|
|
self.next_allowed_time = defaultdict(int)
|
|
|
|
# store prepped (e.g., delayed) callbacks
|
|
self.callback_registry = {}
|
|
|
|
# track event history
|
|
self.event_log = []
|
|
|
|
self._thread_pool = None
|
|
self._route_lock = threading.Lock()
|
|
|
|
@property
|
|
def thread_pool(self):
|
|
if self._thread_pool is None:
|
|
self._thread_pool = ThreadPoolExecutor(max_workers=self.workers)
|
|
return self._thread_pool
|
|
|
|
def register(
|
|
self,
|
|
endpoint,
|
|
callback: Callable,
|
|
pattern,
|
|
debounce=200,
|
|
delay=10,
|
|
**listener_kwargs,
|
|
):
|
|
'''
|
|
Register a route.
|
|
|
|
Note: Listener arguments
|
|
Notice how listener_kwargs are accumulated instead of uniquely assigned to an
|
|
endpoint. This is generally acceptable as some listeners may allow various
|
|
configurations for the same endpoint. Note, however, for something like the
|
|
PathListener, this will have no effect. Registering the same endpoint multiple
|
|
times won't cause any errors, but the configuration options will only remain
|
|
for the last registered group.
|
|
|
|
(Update) The above remark about PathListener's is no longer, and likely never
|
|
was. Varying flag sets under the same endpoint do in fact have a cumulative
|
|
effect, and we need to be able disentangle events accordingly through
|
|
submitted event's `action` value.
|
|
|
|
Parameters:
|
|
endpoint:
|
|
callback: callable accepting an event to be executed if when a matching event
|
|
is received
|
|
pattern: hashable object to be used when filtering event (passed to inherited
|
|
`filter(...)`)
|
|
debounce:
|
|
delay:
|
|
'''
|
|
route_tuple = (callback, pattern, debounce, delay, listener_kwargs)
|
|
self.routemap[endpoint].append(route_tuple)
|
|
|
|
def submit(self, events:E | list[E], callbacks:list[Callable]|None=None):
|
|
'''
|
|
Handle a list of events. Each event is matched against the registered callbacks,
|
|
and those callbacks are ran concurrently (be it via a thread pool or an asyncio
|
|
loop).
|
|
'''
|
|
if type(events) is not list:
|
|
events = [events]
|
|
|
|
futures = []
|
|
for event in events:
|
|
future = self.submit_callback(self.submit_event, event, callbacks=callbacks)
|
|
future.add_done_callback(lambda f: self.clear_event(event, f))
|
|
futures.append(future)
|
|
|
|
return futures
|
|
|
|
def submit_event(self, event: E, callbacks:list[Callable]|None=None):
|
|
'''
|
|
Group up and submit all matching callbacks for `event`. All callbacks are ran
|
|
concurrently in their own threads, and this method blocks until all are completed.
|
|
|
|
In the outer `submit` context, this blocking method is itself ran in its own
|
|
thread, and the registered post-callbacks are attached to the completion of this
|
|
function, i.e., the finishing of all callbacks matching provided event.
|
|
|
|
Note that an event may not match any routes, in which case the method exits early.
|
|
An empty list is returned, and this shows up as the outer future's result. In this
|
|
case, the event is never considered "running," and the non-result picked up in
|
|
`clear_event` will ensure it exits right away (not even attempting to pop the
|
|
event from the running list, and for now not tracking it in the event log).
|
|
'''
|
|
if callbacks is None:
|
|
# ensure same thread gets all matching routes & sets debounce updates; else
|
|
# this may be split across threads mid-check, preventing one thread from
|
|
# handling the blocking of the entire group
|
|
with self._route_lock:
|
|
callbacks = self.matching_routes(event)
|
|
|
|
# stop early if no work to do
|
|
if len(callbacks) == 0:
|
|
return []
|
|
|
|
# enqueue requested/matched callbacks and exit if running
|
|
event_idx = self.event_index(event)
|
|
if event_idx in self.running_events:
|
|
self.queue_callbacks(event_idx, callbacks)
|
|
return []
|
|
|
|
# callbacks now computed, flush the running event
|
|
# note: a separate thread could queue valid callbacks since the running check;
|
|
# o/w we know the running index is empty
|
|
self.running_events[event_idx] = self.running_events[event_idx]
|
|
|
|
# submit matching callbacks and wait for them to complete
|
|
future_results = self.wait_on_callbacks(callbacks, event)
|
|
|
|
# finally call post event-group callbacks (only if some event callbacks were
|
|
# submitted), wait for them to complete
|
|
if future_results:
|
|
self.wait_on_futures([
|
|
self.submit_callback(post_callback, event, future_results)
|
|
for post_callback in self.post_callbacks
|
|
])
|
|
|
|
return future_results
|
|
|
|
def submit_callback(self, callback: Callable, *args, **kwargs):
|
|
'''
|
|
Note: this method is expected to return a future. Perform any event-based
|
|
filtering before submitting a callback with this method.
|
|
'''
|
|
if inspect.iscoroutinefunction(callback):
|
|
if self.loop is None:
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
|
|
#loop.run_in_executor(executor, loop.create_task, callback(event))
|
|
#future = self.loop.call_soon_threadsafe(
|
|
# self.loop.create_task,
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
callback(*args, **kwargs),
|
|
self.loop,
|
|
)
|
|
else:
|
|
future = self.thread_pool.submit(
|
|
callback, *args, **kwargs
|
|
)
|
|
future.add_done_callback(handle_exception)
|
|
|
|
return future
|
|
|
|
def matching_routes(self, event: E, event_time=None):
|
|
'''
|
|
Return eligible matching routes for the provided event.
|
|
|
|
Note that we wait as late as possible before enqueuing matches if the event is in
|
|
fact already active in a frame. If this method were start filtering results while
|
|
the frame is active, and the frame were to finish before all matching callbacks
|
|
were determined, we would be perfectly happy to return all matches, and allow the
|
|
outer `submit_event` context to run them right away in a newly constructed frame.
|
|
The _very_ next thing that gets done is adding this event to the active event
|
|
tracker. Otherwise, matching is performed as usual, and eligible callbacks are
|
|
simply enqueued for the next event frame, which will be checked in the "done"
|
|
callback of the active frame. The logic here should mostly "seal up" any real
|
|
opportunities for error, e.g., a frame ending and popping off elements from
|
|
`running_events` half-way through their inserting at the end of this method, or
|
|
multiple threads checking for matching routes for the same event, and both coming
|
|
away with a non-empty set of matches to run. That last example highlights
|
|
precisely how the single event-frame model works: many threads might be running
|
|
this method at the same time, for the same event (which has fired rapidly), but
|
|
only one should be able to "secure the frame" and begin running the matching
|
|
callbacks. Making the "active frame check" both as late as possible and as close
|
|
to the event blocking stage in the tracker (in `submit_event`), we make the
|
|
ambiguity gap as small as possible (and almost certainly smaller than any
|
|
realistic I/O-bound event duplication).
|
|
|
|
Note: on event actions
|
|
The debounce reset is now only set if the event is successfully filtered. This
|
|
allows some middle ground when trying to depend on event actions: if the
|
|
action passes through, we block the whole range of actions until the debounce
|
|
window completes. Otherwise, the event remains open, only to be blocked by the
|
|
debounce on the first matching action.
|
|
'''
|
|
matches = []
|
|
endpoint = event.endpoint
|
|
name = event.name
|
|
#action = tuple(event.action) # should be more general
|
|
event_time = time.time()*1000 if event_time is None else event_time
|
|
|
|
for (callback, pattern, debounce, delay, listen_kwargs) in self.routemap[endpoint]:
|
|
#index = (endpoint, name, action, callback, pattern, debounce, delay)
|
|
index = (endpoint, name, callback, pattern, debounce, delay)
|
|
|
|
if event_time < self.next_allowed_time[index]:
|
|
# reject event
|
|
continue
|
|
|
|
if self.filter(event, pattern, **listen_kwargs):
|
|
# note that delayed callbacks are added
|
|
matches.append(self.get_delayed_callback(callback, delay, index))
|
|
|
|
# set next debounce
|
|
self.next_allowed_time[index] = event_time + debounce
|
|
|
|
match_text = Style.BRIGHT + Fore.GREEN + 'matched' + Fore.RESET
|
|
|
|
callback_name = str(callback)
|
|
if hasattr(callback, '__name__'):
|
|
callback_name = callback.__name__
|
|
|
|
logger.info(
|
|
f'Event [{name}] {match_text} [{pattern}] under [{endpoint}] for [{callback_name}]'
|
|
)
|
|
else:
|
|
match_text = Style.BRIGHT + Fore.RED + 'rejected' + Fore.RESET
|
|
logger.debug(
|
|
f'Event [{name}] {match_text} against [{pattern}] under [{endpoint}] for [{callback.__name__}]'
|
|
)
|
|
|
|
return matches
|
|
|
|
def get_delayed_callback(self, callback: Callable, delay: int|float, index):
|
|
'''
|
|
Parameters:
|
|
callback: function to wrap
|
|
delay: delay in ms
|
|
'''
|
|
if index not in self.callback_registry:
|
|
async def async_wrap(callback, *args, **kwargs):
|
|
await asyncio.sleep(delay/1000)
|
|
return await callback(*args, **kwargs)
|
|
|
|
def sync_wrap(callback, *args, **kwargs):
|
|
time.sleep(delay/1000)
|
|
return callback(*args, **kwargs)
|
|
|
|
wrapper = None
|
|
if inspect.iscoroutinefunction(callback): wrapper = async_wrap
|
|
else: wrapper = sync_wrap
|
|
|
|
self.callback_registry[index] = partial(wrapper, callback)
|
|
|
|
return self.callback_registry[index]
|
|
|
|
def wait_on_futures(self, futures):
|
|
'''
|
|
Block until all futures in `futures` are complete. Return collected results as a
|
|
list, and log warnings when a future fails.
|
|
'''
|
|
future_results = []
|
|
for future in as_completed(futures):
|
|
try:
|
|
future_results.append(future.result())
|
|
except Exception as e:
|
|
logger.warning(f"Router callback job failed with exception {e}")
|
|
|
|
return future_results
|
|
|
|
def wait_on_callbacks(self, callbacks: list[Callable], event: E, *args, **kwargs):
|
|
'''
|
|
Overridable by inheriting classes based on callback structure
|
|
'''
|
|
return self.wait_on_futures([
|
|
self.submit_callback(callback, event, *args, **kwargs)
|
|
for callback in callbacks
|
|
])
|
|
|
|
def queue_callbacks(self, event_idx, callbacks: list[Callable]):
|
|
'''
|
|
Overridable by inheriting classes based on callback structure
|
|
'''
|
|
self.running_events[event_idx].update(callbacks)
|
|
|
|
def filter(self, event: E, pattern, **listen_kwargs) -> bool:
|
|
'''
|
|
Determine if a given event matches the providedpattern
|
|
|
|
Parameters:
|
|
event:
|
|
pattern:
|
|
listen_kwargs:
|
|
'''
|
|
raise NotImplementedError
|
|
|
|
def add_post_callback(self, callback: Callable):
|
|
self.post_callbacks.append(callback)
|
|
|
|
def get_listener(self, listener_cls=None):
|
|
'''
|
|
Create a new Listener to manage watched routes and their callbacks.
|
|
'''
|
|
if listener_cls is None:
|
|
listener_cls = self.listener_cls
|
|
|
|
if listener_cls is None:
|
|
raise ValueError('No Listener class provided')
|
|
|
|
listener = listener_cls(self)
|
|
return self.extend_listener(listener)
|
|
|
|
def extend_listener(self, listener):
|
|
'''
|
|
Extend a provided Listener object with the Router instance's `listener_kwargs`.
|
|
'''
|
|
for endpoint, route_tuples in self.routemap.items():
|
|
for route_tuple in route_tuples:
|
|
listen_kwargs = route_tuple[-1]
|
|
listener.listen(endpoint, **listen_kwargs)
|
|
return listener
|
|
|
|
def stop_event(self, event):
|
|
'''
|
|
Pop event out of the running events tracker and return it.
|
|
'''
|
|
event_idx = self.event_index(event)
|
|
return self.running_events.pop(event_idx, None)
|
|
|
|
def clear_event(self, event: E, future):
|
|
'''
|
|
Clear an event. Pops the passed event out of `running_events`, and the request
|
|
counter is >0, the event is re-submitted.
|
|
|
|
This method is attached as a "done" callback to the main event wrapping job
|
|
`submit_event`. The `future` given to this method is one to which it was
|
|
attached as this "done" callback. This method should only be called when that
|
|
`future` is finished running (or failed). If any jobs were submitted in the
|
|
wrapper task, the future results here should be non-empty. We use this fact to
|
|
filter out non-work threads that call this method. Because even the
|
|
`matching_routes` check is threaded, we can't wait to see an event has no work to
|
|
schedule, and thus can't prevent this method being attached as a "done" callback.
|
|
The check for results from the passed future allows us to know when in fact a
|
|
valid frame has finished, and a resubmission may be on the table.
|
|
'''
|
|
result = future.result()
|
|
if not result: return
|
|
|
|
self.event_log.append((event, result))
|
|
queued_callbacks = self.stop_event(event)
|
|
|
|
# resubmit event if some queued work remains
|
|
if queued_callbacks and len(queued_callbacks) > 0:
|
|
logger.debug(
|
|
f'Event [{event.name}] resubmitted with [{len(queued_callbacks)}] queued callbacks'
|
|
)
|
|
self.submit(event, callbacks=queued_callbacks)
|
|
|
|
def event_index(self, event):
|
|
return event[:2]
|
|
|
|
|
|
class ChainRouter[E: Event](Router[E]):
|
|
'''
|
|
Routes events to registered callbacks
|
|
'''
|
|
def __init__(self, ordered_routers):
|
|
super().__init__()
|
|
|
|
self.ordered_routers = []
|
|
for router in ordered_routers:
|
|
self.add_router(router)
|
|
|
|
self.running_events = defaultdict(lambda: defaultdict(set))
|
|
|
|
def add_router(self, router):
|
|
'''
|
|
TODO: allow positional insertion in ordered list
|
|
|
|
Note: the `routemap` extensions here shouldn't be necessary, since 1) route maps
|
|
show up only in `matching_routes`, and 2) `matching_routes` is only invoked in
|
|
`submit_event`, which is totally overwritten for the ChainRouter type. All events
|
|
are routed through to individual Routers, and which point their route maps are
|
|
used.
|
|
'''
|
|
self.ordered_routers.append(router)
|
|
for endpoint, routelist in router.routemap.items():
|
|
self.routemap[endpoint].extend(routelist)
|
|
|
|
def matching_routes(self, event: E, event_time=None):
|
|
'''
|
|
Colloquial `callbacks` now used as a dict of lists of callbacks, indexed by
|
|
router, and only having keys for routers with non-empty callback lists.
|
|
'''
|
|
if event_time is None:
|
|
event_time = time.time()*1000
|
|
|
|
route_map = {}
|
|
for router in self.ordered_routers:
|
|
router_matches = router.matching_routes(event, event_time)
|
|
if router_matches:
|
|
route_map[router] = router_matches
|
|
|
|
return route_map
|
|
|
|
def wait_on_callbacks(self, callbacks, event: E, *args, **kwargs):
|
|
'''
|
|
Note: relies on order of callbacks dict matching that of `ordered_routers`, which
|
|
should happen in `matching_routes`
|
|
'''
|
|
results = {}
|
|
for router, callback_list in callbacks.items():
|
|
router_results = router.submit_event(event, callbacks=callback_list)
|
|
results[router] = router_results
|
|
|
|
return results
|
|
|
|
def queue_callbacks(self, event_idx, callbacks):
|
|
for router, callback_list in callbacks.items():
|
|
self.running_events[event_idx][router].update(callback_list)
|
|
|
|
def stop_event(self, event):
|
|
'''
|
|
Sub-routers do not get a "done" callback for their `submit_event` jobs, as they
|
|
would if they handled their own event submissions. They will, however, set the
|
|
submitted event as "running." We can't rely on sub-routers' "done" callbacks to
|
|
"unset" the running event, because the disconnect between the thread completing
|
|
and execution of that callback may take too long.
|
|
|
|
Instead, we explicitly unset the running event for each of the constituent
|
|
sub-routers at the _same time_ we handle the ChainRouter's notion of event's
|
|
ending.
|
|
'''
|
|
event_idx = self.event_index(event)
|
|
for router in self.ordered_routers:
|
|
rq_callbacks = router.running_events.pop(event_idx, [])
|
|
assert len(rq_callbacks) == 0
|
|
|
|
return self.running_events.pop(event_idx, None)
|
|
|
|
def get_listener(self, listener_cls=None):
|
|
if listener_cls is None:
|
|
for router in self.ordered_routers:
|
|
if router.listener_cls is not None:
|
|
listener_cls = router.listener_cls
|
|
break
|
|
|
|
listener = super().get_listener(listener_cls)
|
|
for router in self.ordered_routers:
|
|
router.extend_listener(listener)
|
|
return listener
|
|
|
|
|
|
def handle_exception(future):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
print(f"Exception occurred: {e}")
|
|
traceback.print_exc()
|