add callback timeouts, secondary thread pools, and frame directives to Router

This commit is contained in:
Sam G. 2024-05-14 06:37:08 -07:00
parent 52ff2622f3
commit aab101bd1c
6 changed files with 552 additions and 112 deletions

2
.gitignore vendored
View File

@ -1,7 +1,7 @@
# generic py # generic py
__pycache__/ __pycache__/
.pytest_cache/ .pytest_cache/
localsys.egg-info/ *.egg-info/
.ipynb_checkpoints/ .ipynb_checkpoints/
.python-version .python-version

View File

@ -5,8 +5,10 @@ import time
import asyncio import asyncio
import logging import logging
import inspect import inspect
import traceback import traceback
import threading import threading
import concurrent
from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from collections import defaultdict from collections import defaultdict
@ -18,11 +20,24 @@ from tqdm.auto import tqdm
from execlog.event import Event from execlog.event import Event
from execlog.listener import Listener from execlog.listener import Listener
from execlog.util.generic import color_text from execlog.util.generic import color_text, get_func_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class FrameDirective(Enum):
'''
Indicates frame-level behavior when a callback fails.
'''
CONTINUE_WITHOUT = 1
CANCEL_FRAME = 2
class CallbackTimeoutError(Exception):
...
class CancelledFrameError(Exception):
...
class Router[E: Event]: class Router[E: Event]:
''' '''
Route events to registered callbacks Route events to registered callbacks
@ -78,6 +93,89 @@ class Router[E: Event]:
completely for an event to be fully handled, later jobs may complete before completely for an event to be fully handled, later jobs may complete before
earlier ones, or interact with intermediate disk states (raw file writes, DB earlier ones, or interact with intermediate disk states (raw file writes, DB
inserts, etc), before the earliest call has had a chance to clean up. inserts, etc), before the earliest call has had a chance to clean up.
.. admonition:: Details behind the threaded model and future management
Routers kick off execution in response to *events*. These events are received via
``.submit``, and the following process is kick-started:
1. Each event is wrapped in its own ``.submit_event`` call and submitted as a task
to the *primary* thread pool. Let $E$ be the set of events, and $|E|$ be the
number of events. ``.submit`` exits as soon as these $|E|$ tasks are enqueued,
not waiting for the completion of the corresponding worker threads. There are
now $|E|$ tier-I tasks waiting to be started by the router's primary thread
pool, with states set to "pending."
2. The primary thread pool begins running the enqueued ``.submit_event`` calls
concurrently using allocated resources (e.g., four threads). Each
``.submit_event`` call matches the associated event $e$ to $c_e$ callbacks,
according to the registered routes. These callbacks are each individually
submitted to the *secondary* thread pool as tier-II tasks and waited upon
*within* the ``.submit_event`` call. This thread pool separation prevents
deadlocks which would otherwise be an issue if submitting both tier-I and
tier-II tasks to the same thread pool. Tier-I tasks that have begun this
process are in the "running" state, and the submitted tier-II futures are now
"pending."
3. Once the $c_e$ callbacks for event $e$ are completed (which are tier-II tasks
being waited upon within a tier-I task), their done-callbacks will be invoked
within "a thread belonging to the process that added them" (with
``wait_on_event_callbacks`` calling through to ``submit_callback``, which
attaches ``general_task_done``). Where these done callbacks are executed
varies based on a few conditions. See
https://stackoverflow.com/a/26021772/4573915
for a great breakdown on this. The gist is the following:
a. If the callback is attached to a future that is already cancelled or
completed, it will be invoked immediately in the current thread doing the
attaching.
b. If the future is queued/pending and successfully cancelled, the thread
doing the cancelling will immediately invoke all of the future's callbacks.
c. Otherwise, the thread that executes the future's task (could produce either
a successful result or an exception) will invoke the callbacks.
So if a task completes (i.e., is not cancelled, producing either a result or an
exception), the thread that ran the task will also handle the associated
callbacks. If the task is successfully cancelled (which means it was never
running and never allocated a thread), the cancelling context will handle the
callbacks, and this happens here only in ``.shutdown()`` with the call
``thread_pool.shutdown(cancel_futures=True)``.
The results of these futures are made available in this ``submit_event``
context. Note that these results are dictated by the logic in
``wait_on_futures``. If a future was successfully cancelled or raised and
exception during execution, it will not have a result to add to this list.
The *router-level* post-callbacks are then submitted to the secondary thread
pool and awaited in a similar fashion to the individual $c_e$ callbacks. The
results obtained from the event callbacks are passed through to these
"post-callbacks" for possible centralized processing.
4. Once all post-callbacks have completed (along with *their* attached
"done-callbacks," which are just ``.general_task_done`` checks handled in the
executor thread that ran the post-callback), finally the tier-I
``.submit_event`` future can be marked completed (either with a successfully
attached result or an exception), and its attached "done-callbacks" will be
ran the in the same tier-I thread that handled the task (which is
``general_task_done`` and ``clear_event``, in that order).
- General behaviors and additional remarks:
* Thread pool futures can only be cancelled prior to "running" or "done" states.
Both successful cancellation or completion trigger a future's done-callbacks,
which will be executed in one a few possible contexts depending several
conditions, as detailed above.
* Tier-I tasks have ``clear_event`` attached as a done-callback, which tracks
the task result and resubmits the event if valid (successfully debounced)
repeat requests were received while the event has been handled.
* Both tier-I and tier-II callbacks have a ``.general_task_done`` callback, which
attempts to retrieve the future result if it wasn't cancelled (if it was, this
retrieval would raise a ``CancelledError``). If it wasn't cancelled but an
exception was raised during execution, this same exception will be re-raised
and re-caught, logged as an error, and exit "cleanly" (since job failures
shouldn't throw off the entire process). A successful result retrieval will
have no effect.
- On interrupts, exception handling, and future cancellation:
*
''' '''
listener_cls = Listener[E] listener_cls = Listener[E]
@ -107,6 +205,7 @@ class Router[E: Event]:
# store prepped (e.g., delayed) callbacks # store prepped (e.g., delayed) callbacks
self.callback_registry = {} self.callback_registry = {}
self.callback_start_times = {}
# track event history # track event history
self.event_log = [] self.event_log = []
@ -115,14 +214,23 @@ class Router[E: Event]:
self.should_exit = False self.should_exit = False
self._active_futures = set() self._active_futures = set()
self._thread_pool = None self._thread_pool_1 = None
self._thread_pool_2 = None
self._route_lock = threading.Lock() self._route_lock = threading.Lock()
@property @property
def thread_pool(self): def primary_thread_pool(self):
if self._thread_pool is None: '''Handle tier-I futures.'''
self._thread_pool = ThreadPoolExecutor(max_workers=self.workers) if self._thread_pool_1 is None:
return self._thread_pool self._thread_pool_1 = ThreadPoolExecutor(max_workers=self.workers)
return self._thread_pool_1
@property
def secondary_thread_pool(self):
'''Handle tier-II futures.'''
if self._thread_pool_2 is None:
self._thread_pool_2 = ThreadPoolExecutor(max_workers=self.workers)
return self._thread_pool_2
def register( def register(
self, self,
@ -131,6 +239,8 @@ class Router[E: Event]:
pattern, pattern,
debounce=200, debounce=200,
delay=10, delay=10,
callback_timeout=None,
condition=FrameDirective.CONTINUE_WITHOUT,
**listener_kwargs, **listener_kwargs,
): ):
''' '''
@ -157,8 +267,17 @@ class Router[E: Event]:
``filter(...)``) ``filter(...)``)
debounce: debounce:
delay: delay:
callback_timeout: timeout for waiting
''' '''
route_tuple = (callback, pattern, debounce, delay, listener_kwargs) route_tuple = (
callback,
pattern,
debounce,
delay,
callback_timeout,
condition,
listener_kwargs
)
self.routemap[endpoint].append(route_tuple) self.routemap[endpoint].append(route_tuple)
def submit(self, events: E | list[E], callbacks: list[Callable] | None = None): def submit(self, events: E | list[E], callbacks: list[Callable] | None = None):
@ -178,7 +297,13 @@ class Router[E: Event]:
return futures return futures
def submit_event(self, event: E, callbacks: list[Callable] | None = None): def submit_event(
self,
event : E,
callbacks : list[Callable] | None = None,
timeouts : list[int|float] | None = None,
conditions : list[FrameDirective] | None = None,
) -> list:
''' '''
Group up and submit all matching callbacks for ``event``. All callbacks are ran Group up and submit all matching callbacks for ``event``. All callbacks are ran
concurrently in their own threads, and this method blocks until all are completed. concurrently in their own threads, and this method blocks until all are completed.
@ -198,7 +323,7 @@ class Router[E: Event]:
# this may be split across threads mid-check, preventing one thread from # this may be split across threads mid-check, preventing one thread from
# handling the blocking of the entire group # handling the blocking of the entire group
with self._route_lock: with self._route_lock:
callbacks = self.matching_routes(event) callbacks, timeouts, conditions = self.matching_routes(event)
# stop early if no work to do # stop early if no work to do
if len(callbacks) == 0: if len(callbacks) == 0:
@ -210,34 +335,32 @@ class Router[E: Event]:
self.queue_callbacks(event_idx, callbacks) self.queue_callbacks(event_idx, callbacks)
return [] return []
# TODO: Chesterton's fence
# callbacks now computed, flush the running event # callbacks now computed, flush the running event
# note: a separate thread could queue valid callbacks since the running check; # note: a separate thread could queue valid callbacks since the running check;
# o/w we know the running index is empty # o/w we know the running index is empty
self.running_events[event_idx] = self.running_events[event_idx] # self.running_events[event_idx] = self.running_events[event_idx]
# submit matching callbacks and wait for them to complete # submit matching callbacks and wait for them to complete
future_results = self.wait_on_callbacks(callbacks, event) # *may* raise a FrameCancelledError, which we let propagate upward
completed_futures = self.wait_on_event_callbacks(
event,
callbacks,
timeouts=timeouts,
conditions=conditions,
)
# finally call post event-group callbacks (only if some event callbacks were # finally call post event-group callbacks (only if some event callbacks were
# submitted), wait for them to complete # submitted), wait for them to complete
if future_results: if completed_futures:
self.wait_on_futures([ wait([
self.submit_callback(post_callback, event, future_results) self.submit_event_callback(post_callback, event, completed_futures)[0]
for post_callback in self.post_callbacks for post_callback in self.post_callbacks
]) ])
return future_results return completed_futures
def submit_callback(self, callback: Callable, *args, **kwargs):
'''
Note: this method is expected to return a future. Perform any event-based
filtering before submitting a callback with this method.
'''
# exit immediately if exit flag is set
# if self.should_exit:
# return
callback = self.wrap_safe_callback(callback)
def _submit_with_thread_pool(self, thread_pool, callback: Callable, *args, **kwargs):
if inspect.iscoroutinefunction(callback): if inspect.iscoroutinefunction(callback):
if self.loop is None: if self.loop is None:
self.loop = asyncio.new_event_loop() self.loop = asyncio.new_event_loop()
@ -251,15 +374,48 @@ class Router[E: Event]:
self.loop, self.loop,
) )
else: else:
future = self.thread_pool.submit( future = thread_pool.submit(
callback, *args, **kwargs callback, *args, **kwargs
) )
self._active_futures.add(future)
future.add_done_callback(self.general_task_done)
return future return future
def matching_routes(self, event: E, event_time=None): def submit_callback(self, callback: Callable, *args, **kwargs):
future = self._submit_with_thread_pool(
self.primary_thread_pool,
callback,
*args, **kwargs
)
self._active_futures.add(future)
return future
def submit_event_callback(self, callback: Callable, event: E, *args, **kwargs):
'''
Note: this method is expected to return a future. Perform any event-based
filtering before submitting a callback with this method.
'''
# exit immediately if exit flag is set
# if self.should_exit:
# return
submitted_time = time.time()
callback = self.wrap_timed_callback(callback, submitted_time)
future = self._submit_with_thread_pool(
self.secondary_thread_pool,
callback,
event,
*args, **kwargs
)
future.add_done_callback(self.general_callback)
return future, submitted_time
def matching_routes(
self,
event: E,
event_time = None
) -> tuple[list[Callable], list[int|float]]:
''' '''
Return eligible matching routes for the provided event. Return eligible matching routes for the provided event.
@ -292,12 +448,14 @@ class Router[E: Event]:
debounce on the first matching action. debounce on the first matching action.
''' '''
matches = [] matches = []
timeouts = []
conditions = []
endpoint = event.endpoint endpoint = event.endpoint
name = event.name name = event.name
#action = tuple(event.action) # should be more general #action = tuple(event.action) # should be more general
event_time = time.time()*1000 if event_time is None else event_time event_time = time.time()*1000 if event_time is None else event_time
for (callback, pattern, debounce, delay, listen_kwargs) in self.routemap[endpoint]: for (callback, pattern, debounce, delay, ctimeout, condition, listen_kwargs) in self.routemap[endpoint]:
#index = (endpoint, name, action, callback, pattern, debounce, delay) #index = (endpoint, name, action, callback, pattern, debounce, delay)
index = (endpoint, name, callback, pattern, debounce, delay) index = (endpoint, name, callback, pattern, debounce, delay)
@ -305,9 +463,7 @@ class Router[E: Event]:
# reject event # reject event
continue continue
callback_name = str(callback) callback_name = get_func_name(callback)
if hasattr(callback, '__name__'):
callback_name = callback.__name__
name_text = color_text(name, Fore.BLUE) name_text = color_text(name, Fore.BLUE)
pattern_text = color_text(pattern, Fore.BLUE) pattern_text = color_text(pattern, Fore.BLUE)
@ -317,6 +473,8 @@ class Router[E: Event]:
if self.filter(event, pattern, **listen_kwargs): if self.filter(event, pattern, **listen_kwargs):
# note that delayed callbacks are added # note that delayed callbacks are added
matches.append(self.get_delayed_callback(callback, delay, index)) matches.append(self.get_delayed_callback(callback, delay, index))
timeouts.append(ctimeout)
conditions.append(condition)
# set next debounce # set next debounce
self.next_allowed_time[index] = event_time + debounce self.next_allowed_time[index] = event_time + debounce
@ -331,7 +489,7 @@ class Router[E: Event]:
f'Event [{name_text}] {match_text} against [{pattern_text}] under [{endpoint_text}] for [{callback_text}]' f'Event [{name_text}] {match_text} against [{pattern_text}] under [{endpoint_text}] for [{callback_text}]'
) )
return matches return matches, timeouts, conditions
def get_delayed_callback(self, callback: Callable, delay: int|float, index): def get_delayed_callback(self, callback: Callable, delay: int|float, index):
''' '''
@ -356,29 +514,195 @@ class Router[E: Event]:
return self.callback_registry[index] return self.callback_registry[index]
def wait_on_futures(self, futures): def wait_on_event_callbacks(
self,
event : E,
callbacks : list[Callable],
timeouts : list[int | float | None] | None = None,
conditions : list[FrameDirective | None] | None = None,
): #, *args, **kwargs):
''' '''
Block until all futures in ``futures`` are complete. Return collected results as a Waits for event-associated callbacks to complete.
list, and log warnings when a future fails.
'''
future_results = []
for future in as_completed(futures):
try:
if not future.cancelled():
future_results.append(future.result())
except Exception as e:
logger.warning(f"Router callback job failed with exception \"{e}\"")
return future_results Submits each callback in ``callbacks`` to the thread pool (via
``submit_callback``), passing ``event`` as the only parameter to each. Once
started, the future for callback ``callbacks[i]`` will have ``timeouts[i]``
seconds to complete. If it has not completed in this time, the future's result is
set to the Timeout exception
def wait_on_callbacks(self, callbacks: list[Callable], event: E, *args, **kwargs):
'''
Overridable by inheriting classes based on callback structure Overridable by inheriting classes based on callback structure
''' '''
return self.wait_on_futures([ if timeouts is None:
self.submit_callback(callback, event, *args, **kwargs) timeouts = [None]*len(callbacks)
for callback in callbacks
]) if conditions is None:
conditions = [FrameDirective.CONTINUE_WITHOUT]*len(callbacks)
future_map = {}
timed_futures = set()
for callback, timeout, on_err in zip(callbacks, timeouts, conditions):
# "raw" callback here is the reference that will be indexed in `.callback_start_times`
future, submitted_time = self.submit_event_callback(callback, event) #*args, **kwargs)
future_map[future] = (
callback,
get_func_name(callback),
future,
submitted_time,
timeout,
on_err
)
if timeout is not None:
timed_futures.add(future)
completed_futures = []
# iterate while there are some futures not completed (cancelled, or finished w/
# exception or return value). When completed, a future is removed from `futures`.
while future_map:
min_timeout = float('inf')
expired_futures = [] # actively running but expired
if timed_futures:
time_now = time.time()
for future in list(timed_futures):
callback, cback_name, future, submitted_time, timeout, on_err = future_map[future]
future_tuple = (future, cback_name, on_err)
callback_id = (callback, submitted_time)
if future.done():
timed_futures.remove(future)
continue
if future.running() and callback_id in self.callback_start_times:
time_running = time_now - self.callback_start_times[callback_id]
time_left = timeout - time_running
# track running futures that have timed out
if time_left < 0:
expired_futures.append(future_tuple)
continue
else:
# running w/o a start time, or queued. Set time-left to the timeout
time_left = timeout
# track running future w/ smallest non-zero time left before timeout
min_timeout = min(min_timeout, time_left)
done_futures = []
for future in list(future_map.keys()):
_, cback_name, future, _, _, on_err = future_map[future]
future_tuple = (future, cback_name, on_err)
if future.done():
# done futures have a set result or exception and now immutable
done_futures.append(future_tuple) # local
completed_futures.append(future) # global
future_map.pop(future)
done_with_exception = [
ftuple
for ftuple in done_futures
if ftuple[0].exception() is not None
]
frame_cancellable = expired_futures + done_with_exception
cancel_frame = False
# set a timeout exception for all expired futures, and determine
# if any of these timeouts should result in a frame cancellation
for _, _, on_err in frame_cancellable:
if on_err == FrameDirective.CANCEL_FRAME:
cancel_frame = True
break
# set exceptions on running expired futures
for expired_future, cback_name, _ in expired_futures:
# results are immutable once set; double check doneness
if not expired_future.done():
expired_future.set_exception(
CallbackTimeoutError(
f'Event callback {cback_name} timed out with condition {on_err}'
)
)
if cancel_frame:
# cancel the active frame. Expired futures have already been handled, and
# we now need to handle running futures (not cancellable) and pending
# futures (cancellable)
# explicitly cancel queued futures before looping through active futures
for future in list(future_map.keys()):
# cancellable futures not yet started
# can't really be more specific here, sets a CancelledError
future.cancel()
for future in list(future_map.keys()):
_, cback_name, future, _, _, _ = future_map[future]
if future.done():
# includes the expired futures whose exceptions were just set &
# those cancelled, although managing `future_map` isn't needed at
# this stage
future_map.pop(future)
elif future.running():
# possible memory leak
future.set_exception(
CancelledFrameError(
f'Indirect frame cancellation for event callback {cback_name}'
)
)
# attempt to communicate with threaded processes for graceful shutdown
# -> set shutdown flags
...
# finally, raise an exception indicating the frame was cancelled
raise CancelledFrameError
# if no expired futures (or they can be ignored with CONTINUE_WITHOUT), carry
# on. Wait for the future with the least time remaining; we shouldn't need to
# do any of the above future parsing until at least min-timeout more seconds.
timeout = None
if min_timeout < float('inf'):
timeout = min_timeout
# if any remaining futures can produce CANCELLED FRAMEs, wait only on them.
# This lets us be maximally dynamic:
# -> We respond before the timeout only if the newly completed might CANCEL
# the FRAME, which we want to do as soon as possible.
# -> If not responding early, we wait at most `timeout` seconds to be back and
# checking if the *earliest possible timeout violation* has occurred. If
# so, we want to handle it immediately no matter what so I can set the
# desired Timeout exception as *early as possible*. If I don't, the future
# might finish and set its own result, shutting me out when I would've
# liked to capture the timeout violation.
# -> When CANCEL FRAME futures are available, I don't wait on all pending
# futures as I *don't mind* being late to them. I only need to check
# immediately on CANCEL FRAMEs, and otherwise be back no later than
# `timeout`. If there's no `timeout` I'll wait for the first CANCEL FRAME,
# if no CANCEL FRAME I'll wait up to timeout on all futures, and if neither
# I'll simply wait until all futures complete.
cancel_frame_futures = [
future
for future, ftuple in future_map.items()
if ftuple[-1] == FrameDirective.CANCEL_FRAME
]
return_when = concurrent.futures.ALL_COMPLETED
wait_futures = future_map.keys()
if cancel_frame_futures:
return_when = concurrent.futures.FIRST_COMPLETED
wait_futures = cancel_frame_futures
# if no active future has a timeout
done, _ = wait(
wait_futures,
timeout=timeout,
return_when=return_when
)
# add any trailing completed futures
completed_futures.extend(done)
return completed_futures
def queue_callbacks(self, event_idx, callbacks: list[Callable]): def queue_callbacks(self, event_idx, callbacks: list[Callable]):
''' '''
@ -386,7 +710,7 @@ class Router[E: Event]:
''' '''
self.running_events[event_idx].update(callbacks) self.running_events[event_idx].update(callbacks)
def wrap_safe_callback(self, callback: Callable): def wrap_timed_callback(self, callback: Callable, submitted_time):
''' '''
Check for shutdown flag and exit before running the callbacks. Check for shutdown flag and exit before running the callbacks.
@ -394,9 +718,8 @@ class Router[E: Event]:
an interrupt is received. an interrupt is received.
''' '''
def safe_callback(callback, *args, **kwargs): def safe_callback(callback, *args, **kwargs):
if self.should_exit: # track when this task actually begins
logger.debug('Exiting early from queued callback') self.callback_start_times[(callback, submitted_time)] = time.time()
return
return callback(*args, **kwargs) return callback(*args, **kwargs)
@ -448,31 +771,67 @@ class Router[E: Event]:
def clear_event(self, event: E, future): def clear_event(self, event: E, future):
''' '''
Clear an event. Pops the passed event out of ``running_events``, and the request Clear an event. Pops the passed event out of ``running_events``, and if the
counter is >0, the event is re-submitted. request counter is >0, the event is re-submitted.
This method is attached as a "done" callback to the main event wrapping job This method is attached as a "done" callback to the main event wrapping job
``submit_event``. The ``future`` given to this method is one to which it was ``submit_event``. The ``future`` given to this method is one to which it was
attached as this "done" callback. This method should only be called when that attached as this "done" callback. This method should only be called when that
``future`` is finished running (or failed). If any jobs were submitted in the ``future`` is finished running (or failed). If any jobs were submitted in the
wrapper task, the future results here should be non-empty. We use this fact to wrapper task, the future results here should be non-empty (even if the methods
filter out non-work threads that call this method. Because even the don't return anything; we'll at least have ``[None,...]`` if we scheduled at least
``matching_routes`` check is threaded, we can't wait to see an event has no work to one callback). We use this fact to filter out non-work threads that call this
schedule, and thus can't prevent this method being attached as a "done" callback. method. Because even the ``matching_routes`` check is threaded, we can't wait to
The check for results from the passed future allows us to know when in fact a see an event has no work to schedule, and thus can't prevent this method being
valid frame has finished, and a resubmission may be on the table. attached as a "done" callback. The check for results from the passed future allows
us to know when in fact a valid frame has finished, and a resubmission may be on
the table.
.. admonition:: Why we don't need to worry about resubmission w/ empty results
Note that, even though we can't check for whether there will be any matching
routes prior to calling ``submit_event`` (we have to wait until we're in that
method, at which point this method will already be attached as a callback),
the event will never be marked as "running" and added to ``running_events``.
This means that we won't queue up any callbacks for the same event while
waiting on it, and then exit early here, never to re-submit. The worry was
that a event response might match 0 callbacks (e.g., due to debouncing) and
return ``[]`` from ``submit_event``, but *while waiting for this to complete*,
the same event is submitted and matches (e.g., debouncing timers now allowing
the event through). This could mean that the repeat event gets queued behind
the event in ``running_events`` and should be resubmitted here as a "queued
callback," but *won't do so* because we exit early if no results are obtained.
This is **not an issue** because the original event (that didn't match any
callbacks) will never be marked as running, and thus never prevent the second
event from itself running if in fact in matches a non-empty callback set. This
means that an empty future result set seen here indicates both 1) that no work
took place, and 2) no conflicting events were prevented from running, and we
can exit early here.
''' '''
result = None self._active_futures.remove(future)
if not future.cancelled():
result = future.result()
else:
return None
# result should be *something* if work was scheduled # result should be *something* if work was scheduled, since `submit_event` wraps
if not result: # up futures in a list. If no result, event was never marked active, and don't
return None # want to resubmit as any duplicates were allowed to start. Attempt to get result,
# returning if it's (None or []) or raised an Exception (possibly a
# FrameCancelledError if there's a frame issue, for a CancelledError if the tier-I
# task was successfully cancelled following a `.shutdown()` call)
try:
if not future.result(): return
except concurrent.futures.CancelledError as e:
#logger.error(f'Tier-I future cancelled')
# do not re-raise; outer context can handle cancellations
return
except CancelledFrameError as e:
# do not re-raise; outer context will manage frame cancellations
return
except Exception as e:
# print traceback for unexpected exception
logger.error(f'Unexpected exception in tier-I future: "{e}"')
traceback.print_exc()
return
self.event_log.append((event, result)) self.event_log.append((event, future))
queued_callbacks = self.stop_event(event) queued_callbacks = self.stop_event(event)
# resubmit event if some queued work remains # resubmit event if some queued work remains
@ -485,28 +844,38 @@ class Router[E: Event]:
def event_index(self, event): def event_index(self, event):
return event[:2] return event[:2]
def general_callback(self, future):
try:
future.result()
except concurrent.futures.CancelledError as e:
logger.error(f'Tier-II future cancelled; "{e}"')
except CancelledFrameError as e:
logger.error(f'Tier-II frame cancelled; "{e}"')
except Exception as e:
logger.warning(f'Tier-II job failed with unknown exception "{e}"')
def shutdown(self): def shutdown(self):
logger.info(color_text('Router shutdown received', Fore.BLACK, Back.RED)) logger.info(color_text('Router shutdown received', Fore.BLACK, Back.RED))
self.should_exit = True self.should_exit = True
# manually track and cancel pending futures b/c `.shutdown(cancel_futures=True)`
# is misleading, and will cause an outer `as_completed` loop to hang
for future in tqdm( for future in tqdm(
list(self._active_futures), list(self._active_futures),
desc=color_text('Cancelling active futures...', Fore.BLACK, Back.RED), desc=color_text(
f'Cancelling {len(self._active_futures)} pending futures...',
Fore.BLACK, Back.RED),
colour='red', colour='red',
): ):
future.cancel() future.cancel()
if self.thread_pool is not None: if self._thread_pool_2 is not None:
self.thread_pool.shutdown(wait=False) # cancel pending futures (i.e., those not started)
self.secondary_thread_pool.shutdown(wait=False)
def general_task_done(self, future): if self._thread_pool_1 is not None:
self._active_futures.remove(future) # cancel pending futures (i.e., those not started)
try: self.primary_thread_pool.shutdown(wait=False)
if not future.cancelled():
future.result()
except Exception as e:
logger.error(f"Exception occurred in threaded task: '{e}'")
#traceback.print_exc()
class ChainRouter[E: Event](Router[E]): class ChainRouter[E: Event](Router[E]):
@ -538,7 +907,11 @@ class ChainRouter[E: Event](Router[E]):
for endpoint, routelist in router.routemap.items(): for endpoint, routelist in router.routemap.items():
self.routemap[endpoint].extend(routelist) self.routemap[endpoint].extend(routelist)
def matching_routes(self, event: E, event_time=None): def matching_routes(
self,
event: E,
event_time=None
):
''' '''
Colloquial ``callbacks`` now used as a dict of lists of callbacks, indexed by Colloquial ``callbacks`` now used as a dict of lists of callbacks, indexed by
router, and only having keys for routers with non-empty callback lists. router, and only having keys for routers with non-empty callback lists.
@ -546,25 +919,51 @@ class ChainRouter[E: Event](Router[E]):
if event_time is None: if event_time is None:
event_time = time.time()*1000 event_time = time.time()*1000
route_map = {} callback_map = {}
timeout_map = {}
condition_map = {}
for router in self.ordered_routers: for router in self.ordered_routers:
router_matches = router.matching_routes(event, event_time) matches, timeouts, conditions = router.matching_routes(event, event_time)
if router_matches: if matches:
route_map[router] = router_matches callback_map[router] = matches
timeout_map[router] = timeouts
condition_map[router] = conditions
return route_map return callback_map, timeout_map, condition_map
def wait_on_callbacks(self, callbacks, event: E, *args, **kwargs): def wait_on_event_callbacks(
self,
event : E,
callbacks : dict[Router, list[Callable]],
timeouts : dict[Router, list[int | float | None]] | None = None,
conditions : dict[Router, list[FrameDirective | None]] | None = None,
): #, *args, **kwargs):
''' '''
Note: relies on order of callbacks dict matching that of ``ordered_routers``, which Returns a dictionary mapping from
should happen in ``matching_routes``
Note: relies on order of callback-associated dicts matching that of
``ordered_routers``, which should happen in ``matching_routes``.
This method blurs the OOP lines a bit, as we're actually passing dicts rather than
the lists expected by the super class. The parent ``submit_event`` is carefully
designed to be indifferent; use caution when making changes.
''' '''
results = {} if timeouts is not None:
timeouts = {}
if conditions is not None:
conditions = {}
futures = {}
for router, callback_list in callbacks.items(): for router, callback_list in callbacks.items():
router_results = router.submit_event(event, callbacks=callback_list) futures[router] = router.submit_event(
results[router] = router_results event,
callback_list,
timeouts=timeouts.get(router),
conditions=conditions.get(router),
)
return results return futures
def queue_callbacks(self, event_idx, callbacks): def queue_callbacks(self, event_idx, callbacks):
for router, callback_list in callbacks.items(): for router, callback_list in callbacks.items():
@ -601,6 +1000,12 @@ class ChainRouter[E: Event](Router[E]):
router.extend_listener(listener) router.extend_listener(listener)
return listener return listener
def shutdown(self):
super().shutdown()
# for router in self.ordered_routers:
# router.shutdown()
# RouterBuilder # RouterBuilder
def route(router, route_group, **route_kwargs): def route(router, route_group, **route_kwargs):

View File

@ -1,4 +1,5 @@
import logging import logging
import concurrent
from pathlib import Path from pathlib import Path
from concurrent.futures import as_completed from concurrent.futures import as_completed
@ -10,6 +11,7 @@ from co3.resources import DiskResource
from co3 import Differ, Syncer, Database from co3 import Differ, Syncer, Database
from execlog.event import Event from execlog.event import Event
from execlog.router import CancelledFrameError
from execlog.routers import PathRouter from execlog.routers import PathRouter
from execlog.util.generic import color_text from execlog.util.generic import color_text
@ -146,20 +148,44 @@ class PathRouterSyncer(Syncer[Path]):
# note: we structure this future waiting like this for the TQDM view # note: we structure this future waiting like this for the TQDM view
results = [] results = []
for future in tqdm( success = 0
as_completed(event_futures), cancelled = 0
errored = 0
submitted = len(event_futures)
progress_bar = tqdm(
total=len(event_futures), total=len(event_futures),
desc=f'Awaiting chunk futures [submitted {len(event_futures)}]' desc=f'Awaiting chunk futures [submitted {len(event_futures)}]'
): )
for future in as_completed(event_futures):
try: try:
if not future.cancelled(): callback_future_list = future.result()
results.append(future.result()) results.append(callback_future_list)
success += 1
except concurrent.futures.CancelledError as e:
cancelled += 1
#logger.error(f'Event future cancelled; "{e}"')
except CancelledFrameError as e:
errored += 1
pass
#logger.error(f'Event frame cancelled; "{e}"')
except Exception as e: except Exception as e:
logger.warning(f"Sync job failed with exception {e}") errored += 1
logger.warning(f'Sync job failed with unknown exception "{e}"')
suc_txt = color_text(f'{success}', Fore.GREEN)
can_txt = color_text(f'{cancelled}', Fore.YELLOW)
err_txt = color_text(f'{errored}', Fore.RED)
tot_txt = color_text(f'{success+cancelled+errored}', Style.BRIGHT)
progress_bar.set_description(
f'Awaiting chunk futures [{tot_txt} / {submitted} | {suc_txt} {can_txt} {err_txt}]'
)
progress_bar.update(n=1)
progress_bar.close()
return results return results
def shutdown(self): def shutdown(self):
super().shutdown() super().shutdown()
self.router.shutdown() self.router.shutdown()

View File

@ -8,6 +8,13 @@ from colorama import Fore, Back, Style
def color_text(text, *colorama_args): def color_text(text, *colorama_args):
return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}" return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}"
def get_func_name(func):
func_name = str(func)
if hasattr(func, '__name__'):
func_name = func.__name__
return func_name
class ColorFormatter(logging.Formatter): class ColorFormatter(logging.Formatter):
_format = '%(levelname)-8s :: %(name)s %(message)s' _format = '%(levelname)-8s :: %(name)s %(message)s'

View File

@ -1,6 +1,7 @@
import logging import logging
from pathlib import Path from pathlib import Path
from functools import partial from functools import partial
from concurrent.futures import wait
from execlog import util from execlog import util
from execlog import ChainRouter, Event from execlog import ChainRouter, Event
@ -41,7 +42,7 @@ def test_single_router_submission():
Event(endpoint='endpoint_proxy', name='file3'), Event(endpoint='endpoint_proxy', name='file3'),
] ]
futures = router2.submit(events) futures = router2.submit(events)
router2.wait_on_futures(futures) wait(futures)
assert True assert True
@ -52,7 +53,7 @@ def test_chain_router_submission():
Event(endpoint='endpoint_proxy', name='file3'), Event(endpoint='endpoint_proxy', name='file3'),
] ]
futures = chain_router.submit(events) futures = chain_router.submit(events)
chain_router.wait_on_futures(futures) wait(futures)
assert True assert True

View File

@ -94,7 +94,8 @@ def test_server_with_listeners():
server.shutdown() server.shutdown()
thread.join() thread.join()
# finally check the router event logs # finally check the router event logs: holds tier-I futures, which hold lists of
assert router1.event_log[0][1] == ['router1 job success'] # tier-II futures
assert router2.event_log[0][1] == ['router2 job success'] assert [r.result() for r in router1.event_log[0][1].result()] == ['router1 job success']
assert [r.result() for r in router2.event_log[0][1].result()] == ['router2 job success']